This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d4ca0aa580 [SPARK-43563][SPARK-43459][SPARK-43451][SPARK-43506] 
Remove `squeeze` from `read_csv` & enabling more tests
8d4ca0aa580 is described below

commit 8d4ca0aa580f9622eb6f74bb99ae8a3e4e9667d5
Author: itholic <haejoon....@databricks.com>
AuthorDate: Tue Aug 22 08:20:17 2023 +0800

    [SPARK-43563][SPARK-43459][SPARK-43451][SPARK-43506] Remove `squeeze` from 
`read_csv` & enabling more tests
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to remove `squeeze` parameter from `read_csv` to follow 
the behavior of latest pandas. See 
https://github.com/pandas-dev/pandas/pull/40413 and 
https://github.com/pandas-dev/pandas/pull/43427 for detail.
    
    This PR also enables more tests for pandas 2.0.0 and above.
    
    ### Why are the changes needed?
    
    To follow the behavior of latest pandas, and increase the test coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    `squeeze` will be no longer available from `read_csv`. Otherwise, it's 
test-only.
    
    ### How was this patch tested?
    
    Enabling & updating the existing tests.
    
    Closes #42551 from itholic/pandas_remaining_tests.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../source/migration_guide/pyspark_upgrade.rst     |   1 +
 python/pyspark/pandas/namespace.py                 |  32 +---
 python/pyspark/pandas/tests/test_csv.py            |  18 ---
 .../tests/test_ops_on_diff_frames_groupby.py       |  16 +-
 .../test_ops_on_diff_frames_groupby_rolling.py     |  33 +++-
 python/pyspark/pandas/tests/test_rolling.py        | 177 ++++++++++++++-------
 .../pyspark/sql/tests/connect/test_parity_arrow.py |   7 -
 python/pyspark/sql/tests/test_arrow.py             |   4 -
 8 files changed, 153 insertions(+), 135 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index c3d67df425b..6499191d8c8 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -38,6 +38,7 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``sort_columns`` parameter from ``DataFrame.plot`` and 
`Series.plot`` has been removed from pandas API on Spark.
 * In Spark 4.0, the default value of ``regex`` parameter for 
``Series.str.replace`` has been changed from ``True`` to ``False`` from pandas 
API on Spark. Additionally, a single character ``pat`` with ``regex=True`` is 
now treated as a regular expression instead of a string literal.
 * In Spark 4.0, the resulting name from ``value_counts`` for all objects sets 
to ``'count'`` (or ``'propotion'`` if ``nomalize=True`` was passed) from pandas 
API on Spark, and the index will be named after the original object.
+* In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and 
``ps.read_excel`` has been removed from pandas API on Spark.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/pandas/namespace.py 
b/python/pyspark/pandas/namespace.py
index fddf1bec63f..be6d57aa033 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -222,7 +222,6 @@ def read_csv(
     names: Optional[Union[str, List[str]]] = None,
     index_col: Optional[Union[str, List[str]]] = None,
     usecols: Optional[Union[List[int], List[str], Callable[[str], bool]]] = 
None,
-    squeeze: bool = False,
     mangle_dupe_cols: bool = True,
     dtype: Optional[Union[str, Dtype, Dict[str, Union[str, Dtype]]]] = None,
     nrows: Optional[int] = None,
@@ -262,11 +261,6 @@ def read_csv(
         from the document header row(s).
         If callable, the callable function will be evaluated against the 
column names,
         returning names where the callable function evaluates to `True`.
-    squeeze : bool, default False
-        If the parsed data only contains one column then return a Series.
-
-        .. deprecated:: 3.4.0
-
     mangle_dupe_cols : bool, default True
         Duplicate columns will be specified as 'X0', 'X1', ... 'XN', rather
         than 'X' ... 'X'. Passing in False will cause data to be overwritten if
@@ -466,10 +460,7 @@ def read_csv(
             for col in psdf.columns:
                 psdf[col] = psdf[col].astype(dtype)
 
-    if squeeze and len(psdf.columns) == 1:
-        return first_series(psdf)
-    else:
-        return psdf
+    return psdf
 
 
 def read_json(
@@ -912,7 +903,6 @@ def read_excel(
     names: Optional[List] = None,
     index_col: Optional[List[int]] = None,
     usecols: Optional[Union[int, str, List[Union[int, str]], Callable[[str], 
bool]]] = None,
-    squeeze: bool = False,
     dtype: Optional[Dict[str, Union[str, Dtype]]] = None,
     engine: Optional[str] = None,
     converters: Optional[Dict] = None,
@@ -985,11 +975,6 @@ def read_excel(
         * If list of string, then indicates list of column names to be parsed.
         * If callable, then evaluate each column name against it and parse the
           column if the callable returns ``True``.
-    squeeze : bool, default False
-        If the parsed data only contains one column then return a Series.
-
-        .. deprecated:: 3.4.0
-
     dtype : Type name or dict of column -> type, default None
         Data type for data or columns. E.g. {'a': np.float64, 'b': np.int32}
         Use `object` to preserve data as stored in Excel and not interpret 
dtype.
@@ -1142,7 +1127,7 @@ def read_excel(
     """
 
     def pd_read_excel(
-        io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None], sq: 
bool
+        io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None]
     ) -> pd.DataFrame:
         return pd.read_excel(
             io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray)) 
else io_or_bin,
@@ -1151,7 +1136,6 @@ def read_excel(
             names=names,
             index_col=index_col,
             usecols=usecols,
-            squeeze=sq,
             dtype=dtype,
             engine=engine,
             converters=converters,
@@ -1181,7 +1165,7 @@ def read_excel(
         io_or_bin = io
         single_file = True
 
-    pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name, sq=squeeze)
+    pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name)
 
     if single_file:
         if isinstance(pdf_or_psers, dict):
@@ -1208,9 +1192,7 @@ def read_excel(
             )
 
             def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
-                pdf = pd.concat(
-                    [pd_read_excel(bin, sn=sn, sq=False) for bin in 
pdf[pdf.columns[0]]]
-                )
+                pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in 
pdf[pdf.columns[0]]])
 
                 reset_index = pdf.reset_index()
                 for name, col in reset_index.items():
@@ -1231,11 +1213,7 @@ def read_excel(
                 .mapInPandas(lambda iterator: map(output_func, iterator), 
schema=return_schema)
             )
 
-            psdf = DataFrame(psdf._internal.with_new_sdf(sdf))
-            if squeeze and len(psdf.columns) == 1:
-                return first_series(psdf)
-            else:
-                return psdf
+            return DataFrame(psdf._internal.with_new_sdf(sdf))
 
         if isinstance(pdf_or_psers, dict):
             return {
diff --git a/python/pyspark/pandas/tests/test_csv.py 
b/python/pyspark/pandas/tests/test_csv.py
index b118f7cf8a9..a367dd72be1 100644
--- a/python/pyspark/pandas/tests/test_csv.py
+++ b/python/pyspark/pandas/tests/test_csv.py
@@ -255,24 +255,6 @@ class CsvTestsMixin:
             actual = ps.read_csv(fn, sep="\t")
             self.assert_eq(expected, actual, almost=True)
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43563): Enable CsvTests.test_read_csv_with_squeeze for 
pandas 2.0.0.",
-    )
-    def test_read_csv_with_squeeze(self):
-        with self.csv_file(self.csv_text) as fn:
-            expected = pd.read_csv(fn, squeeze=True, usecols=["name"])
-            actual = ps.read_csv(fn, squeeze=True, usecols=["name"])
-            self.assert_eq(expected, actual, almost=True)
-
-            expected = pd.read_csv(fn, squeeze=True, usecols=["name", 
"amount"])
-            actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"])
-            self.assert_eq(expected, actual, almost=True)
-
-            expected = pd.read_csv(fn, squeeze=True, usecols=["name", 
"amount"], index_col=["name"])
-            actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"], 
index_col=["name"])
-            self.assert_eq(expected, actual, almost=True)
-
     def test_read_csv_with_mangle_dupe_cols(self):
         self.assertRaisesRegex(
             ValueError, "mangle_dupe_cols", lambda: ps.read_csv("path", 
mangle_dupe_cols=False)
diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py 
b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py
index f581db4bc2f..23d1b04dd3d 100644
--- a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py
+++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py
@@ -37,11 +37,6 @@ class OpsOnDiffFramesGroupByTestsMixin:
         reset_option("compute.ops_on_diff_frames")
         super().tearDownClass()
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43460): Enable 
OpsOnDiffFramesGroupByTests.test_groupby_different_lengths "
-        "for pandas 2.0.0.",
-    )
     def test_groupby_different_lengths(self):
         pdfs1 = [
             pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2], "d": 
list("abcdefght")}),
@@ -71,7 +66,7 @@ class OpsOnDiffFramesGroupByTestsMixin:
 
                 self.assert_eq(
                     sort(psdf1.groupby(psdf2.a, as_index=as_index).sum()),
-                    sort(pdf1.groupby(pdf2.a, as_index=as_index).sum()),
+                    sort(pdf1.groupby(pdf2.a, 
as_index=as_index).sum(numeric_only=True)),
                     almost=as_index,
                 )
 
@@ -86,11 +81,6 @@ class OpsOnDiffFramesGroupByTestsMixin:
                     almost=as_index,
                 )
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43459): Enable 
OpsOnDiffFramesGroupByTests.test_groupby_multiindex_columns "
-        "for pandas 2.0.0.",
-    )
     def test_groupby_multiindex_columns(self):
         pdf1 = pd.DataFrame(
             {("y", "c"): [4, 2, 7, 3, None, 1, 1, 1, 2], ("z", "d"): 
list("abcdefght")}
@@ -103,7 +93,7 @@ class OpsOnDiffFramesGroupByTestsMixin:
 
         self.assert_eq(
             psdf1.groupby(psdf2[("x", "a")]).sum().sort_index(),
-            pdf1.groupby(pdf2[("x", "a")]).sum().sort_index(),
+            pdf1.groupby(pdf2[("x", "a")]).sum(numeric_only=True).sort_index(),
         )
 
         self.assert_eq(
@@ -112,7 +102,7 @@ class OpsOnDiffFramesGroupByTestsMixin:
             .sort_values(("y", "c"))
             .reset_index(drop=True),
             pdf1.groupby(pdf2[("x", "a")], as_index=False)
-            .sum()
+            .sum(numeric_only=True)
             .sort_values(("y", "c"))
             .reset_index(drop=True),
         )
diff --git 
a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py 
b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py
index 17e2bb82bd5..1e5e11637e5 100644
--- a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py
+++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py
@@ -72,12 +72,35 @@ class OpsOnDiffFramesGroupByRollingTestsMixin:
             getattr(pdf.groupby(pkey)[["b"]].rolling(2), f)().sort_index(),
         )
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43452): Enable RollingTests.test_groupby_rolling_count for 
pandas 2.0.0.",
-    )
     def test_groupby_rolling_count(self):
-        self._test_groupby_rolling_func("count")
+        pser = pd.Series([1, 2, 3], name="a")
+        pkey = pd.Series([1, 2, 3], name="a")
+        psser = ps.from_pandas(pser)
+        kkey = ps.from_pandas(pkey)
+
+        # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work 
same as pandas
+        self.assert_eq(
+            psser.groupby(kkey).rolling(2).count().sort_index(),
+            pser.groupby(pkey).rolling(2, min_periods=1).count().sort_index(),
+        )
+
+        pdf = pd.DataFrame({"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0]})
+        pkey = pd.Series([1, 2, 3, 2], name="a")
+        psdf = ps.from_pandas(pdf)
+        kkey = ps.from_pandas(pkey)
+
+        self.assert_eq(
+            psdf.groupby(kkey).rolling(2).count().sort_index(),
+            pdf.groupby(pkey).rolling(2, min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(kkey)["b"].rolling(2).count().sort_index(),
+            pdf.groupby(pkey)["b"].rolling(2, 
min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(kkey)[["b"]].rolling(2).count().sort_index(),
+            pdf.groupby(pkey)[["b"]].rolling(2, 
min_periods=1).count().sort_index(),
+        )
 
     def test_groupby_rolling_min(self):
         self._test_groupby_rolling_func("min")
diff --git a/python/pyspark/pandas/tests/test_rolling.py 
b/python/pyspark/pandas/tests/test_rolling.py
index 00b9de8a478..526962e3bbd 100644
--- a/python/pyspark/pandas/tests/test_rolling.py
+++ b/python/pyspark/pandas/tests/test_rolling.py
@@ -86,12 +86,34 @@ class RollingTestsMixin:
     def test_rolling_sum(self):
         self._test_rolling_func("sum")
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43451): Enable RollingTests.test_rolling_count for pandas 
2.0.0.",
-    )
     def test_rolling_count(self):
-        self._test_rolling_func("count")
+        pser = pd.Series([1, 2, 3, 7, 9, 8], index=np.random.rand(6), name="a")
+        psser = ps.from_pandas(pser)
+        self.assert_eq(psser.rolling(2).count(), pser.rolling(2, 
min_periods=1).count())
+        self.assert_eq(psser.rolling(2).count().sum(), pser.rolling(2, 
min_periods=1).count().sum())
+
+        # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work 
same as pandas
+        # Multiindex
+        pser = pd.Series(
+            [1, 2, 3],
+            index=pd.MultiIndex.from_tuples([("a", "x"), ("a", "y"), ("b", 
"z")]),
+            name="a",
+        )
+        psser = ps.from_pandas(pser)
+        self.assert_eq(psser.rolling(2).count(), pser.rolling(2, 
min_periods=1).count())
+
+        pdf = pd.DataFrame(
+            {"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 1.0]}, 
index=np.random.rand(4)
+        )
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(psdf.rolling(2).count(), pdf.rolling(2, 
min_periods=1).count())
+        self.assert_eq(psdf.rolling(2).count().sum(), pdf.rolling(2, 
min_periods=1).count().sum())
+
+        # Multiindex column
+        columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")])
+        pdf.columns = columns
+        psdf.columns = columns
+        self.assert_eq(psdf.rolling(2).count(), pdf.rolling(2, 
min_periods=1).count())
 
     def test_rolling_std(self):
         self._test_rolling_func("std")
@@ -138,33 +160,18 @@ class RollingTestsMixin:
         pdf = pd.DataFrame({"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 
1.0]})
         psdf = ps.from_pandas(pdf)
 
-        # The behavior of GroupBy.rolling is changed from pandas 1.3.
-        if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(pdf.a).rolling(2)).sort_index(),
-            )
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a).rolling(2)).sum(),
-                pd_func(pdf.groupby(pdf.a).rolling(2)).sum(),
-            )
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(pdf.a + 1).rolling(2)).sort_index(),
-            )
-        else:
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(pdf.a).rolling(2)).drop("a", 
axis=1).sort_index(),
-            )
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a).rolling(2)).sum(),
-                pd_func(pdf.groupby(pdf.a).rolling(2)).sum().drop("a"),
-            )
-            self.assert_eq(
-                ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(pdf.a + 1).rolling(2)).drop("a", 
axis=1).sort_index(),
-            )
+        self.assert_eq(
+            ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(),
+            pd_func(pdf.groupby(pdf.a).rolling(2)).sort_index(),
+        )
+        self.assert_eq(
+            ps_func(psdf.groupby(psdf.a).rolling(2)).sum(),
+            pd_func(pdf.groupby(pdf.a).rolling(2)).sum(),
+        )
+        self.assert_eq(
+            ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(),
+            pd_func(pdf.groupby(pdf.a + 1).rolling(2)).sort_index(),
+        )
 
         self.assert_eq(
             ps_func(psdf.b.groupby(psdf.a).rolling(2)).sort_index(),
@@ -184,36 +191,84 @@ class RollingTestsMixin:
         pdf.columns = columns
         psdf.columns = columns
 
-        # The behavior of GroupBy.rolling is changed from pandas 1.3.
-        if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
-            self.assert_eq(
-                ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(("a", "x")).rolling(2)).sort_index(),
-            )
-
-            self.assert_eq(
-                ps_func(psdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2)).sort_index(),
-                pd_func(pdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2)).sort_index(),
-            )
-        else:
-            self.assert_eq(
-                ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(),
-                pd_func(pdf.groupby(("a", "x")).rolling(2)).drop(("a", "x"), 
axis=1).sort_index(),
-            )
-
-            self.assert_eq(
-                ps_func(psdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2)).sort_index(),
-                pd_func(pdf.groupby([("a", "x"), ("a", "y")]).rolling(2))
-                .drop([("a", "x"), ("a", "y")], axis=1)
-                .sort_index(),
-            )
-
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43452): Enable RollingTests.test_groupby_rolling_count for 
pandas 2.0.0.",
-    )
+        self.assert_eq(
+            ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(),
+            pd_func(pdf.groupby(("a", "x")).rolling(2)).sort_index(),
+        )
+
+        self.assert_eq(
+            ps_func(psdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2)).sort_index(),
+            pd_func(pdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2)).sort_index(),
+        )
+
     def test_groupby_rolling_count(self):
-        self._test_groupby_rolling_func("count")
+        pser = pd.Series([1, 2, 3, 2], index=np.random.rand(4), name="a")
+        psser = ps.from_pandas(pser)
+        # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work 
same as pandas
+        self.assert_eq(
+            psser.groupby(psser).rolling(2).count().sort_index(),
+            pser.groupby(pser).rolling(2, min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psser.groupby(psser).rolling(2).count().sum(),
+            pser.groupby(pser).rolling(2, min_periods=1).count().sum(),
+        )
+
+        # Multiindex
+        pser = pd.Series(
+            [1, 2, 3, 2],
+            index=pd.MultiIndex.from_tuples([("a", "x"), ("a", "y"), ("b", 
"z"), ("c", "z")]),
+            name="a",
+        )
+        psser = ps.from_pandas(pser)
+        self.assert_eq(
+            psser.groupby(psser).rolling(2).count().sort_index(),
+            pser.groupby(pser).rolling(2, min_periods=1).count().sort_index(),
+        )
+
+        pdf = pd.DataFrame({"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 
1.0]})
+        psdf = ps.from_pandas(pdf)
+
+        self.assert_eq(
+            psdf.groupby(psdf.a).rolling(2).count().sort_index(),
+            pdf.groupby(pdf.a).rolling(2, min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(psdf.a).rolling(2).count().sum(),
+            pdf.groupby(pdf.a).rolling(2, min_periods=1).count().sum(),
+        )
+        self.assert_eq(
+            psdf.groupby(psdf.a + 1).rolling(2).count().sort_index(),
+            pdf.groupby(pdf.a + 1).rolling(2, 
min_periods=1).count().sort_index(),
+        )
+
+        self.assert_eq(
+            psdf.b.groupby(psdf.a).rolling(2).count().sort_index(),
+            pdf.b.groupby(pdf.a).rolling(2, 
min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(psdf.a)["b"].rolling(2).count().sort_index(),
+            pdf.groupby(pdf.a)["b"].rolling(2, 
min_periods=1).count().sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(psdf.a)[["b"]].rolling(2).count().sort_index(),
+            pdf.groupby(pdf.a)[["b"]].rolling(2, 
min_periods=1).count().sort_index(),
+        )
+
+        # Multiindex column
+        columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.groupby(("a", "x")).rolling(2).count().sort_index(),
+            pdf.groupby(("a", "x")).rolling(2, 
min_periods=1).count().sort_index(),
+        )
+
+        self.assert_eq(
+            psdf.groupby([("a", "x"), ("a", 
"y")]).rolling(2).count().sort_index(),
+            pdf.groupby([("a", "x"), ("a", "y")]).rolling(2, 
min_periods=1).count().sort_index(),
+        )
 
     def test_groupby_rolling_min(self):
         self._test_groupby_rolling_func("min")
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index 5f76cafb192..258502baa4d 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -16,9 +16,6 @@
 #
 
 import unittest
-from distutils.version import LooseVersion
-
-import pandas as pd
 
 from pyspark.sql.tests.test_arrow import ArrowTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
@@ -123,10 +120,6 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase, PandasOnSparkTest
     def test_createDataFrame_duplicate_field_names(self):
         self.check_createDataFrame_duplicate_field_names(True)
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43506): Enable ArrowTests.test_toPandas_empty_columns for 
pandas 2.0.0.",
-    )
     def test_toPandas_empty_columns(self):
         self.check_toPandas_empty_columns(True)
 
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index ac45c4c565f..e26aabbea27 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -1015,10 +1015,6 @@ class ArrowTestsMixin:
 
         self.assertEqual(df.collect(), data)
 
-    @unittest.skipIf(
-        LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-        "TODO(SPARK-43506): Enable ArrowTests.test_toPandas_empty_columns for 
pandas 2.0.0.",
-    )
     def test_toPandas_empty_columns(self):
         for arrow_enabled in [True, False]:
             with self.subTest(arrow_enabled=arrow_enabled):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to