This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe635eb1d44 [SPARK-48482][PYTHON][FOLLOWUP] Revert dropDuplicates and 
dropDuplicatesWIthinWatermark should accept variable length args
ebe635eb1d44 is described below

commit ebe635eb1d44dee879623e8646bd3be7424b5676
Author: Wei Liu <[email protected]>
AuthorDate: Fri Aug 30 09:13:58 2024 +0900

    [SPARK-48482][PYTHON][FOLLOWUP] Revert dropDuplicates and 
dropDuplicatesWIthinWatermark should accept variable length args
    
    ### What changes were proposed in this pull request?
    
    Per conversation from 
https://github.com/apache/spark/pull/47835#issuecomment-2311082085, we will 
revert 560c08332b35941260169124b4f522bdc82b84d8 for API parity with Pandas API
    
    ### Why are the changes needed?
    
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, reverting the API would reenable user to use 
`dropDuplicates(subset=xxx)`
    
    ### How was this patch tested?
    
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47916 from WweiL/revert-dropDuplicates-api.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 docs/structured-streaming-programming-guide.md     |  6 +--
 python/pyspark/sql/classic/dataframe.py            | 51 +++++++++---------
 python/pyspark/sql/connect/dataframe.py            | 62 +++++++++-------------
 python/pyspark/sql/connect/plan.py                 |  2 +-
 python/pyspark/sql/dataframe.py                    | 22 ++------
 .../sql/tests/connect/test_connect_basic.py        | 12 -----
 .../pyspark/sql/tests/connect/test_connect_plan.py | 20 ++-----
 python/pyspark/sql/tests/test_dataframe.py         | 30 +++--------
 8 files changed, 69 insertions(+), 136 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index aa20da6ae81d..d266e761263b 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2082,12 +2082,12 @@ You can deduplicate records in data streams using a 
unique identifier in the eve
 streamingDf = spark.readStream. ...
 
 # Without watermark using guid column
-streamingDf.dropDuplicates("guid")
+streamingDf.dropDuplicates(["guid"])
 
 # With watermark using guid and eventTime columns
 streamingDf \
   .withWatermark("eventTime", "10 seconds") \
-  .dropDuplicates("guid", "eventTime")
+  .dropDuplicates(["guid", "eventTime"])
 {% endhighlight %}
 
 </div>
@@ -2163,7 +2163,7 @@ streamingDf = spark.readStream. ...
 # deduplicate using guid column with watermark based on eventTime column
 streamingDf \
   .withWatermark("eventTime", "10 hours") \
-  .dropDuplicatesWithinWatermark("guid")
+  .dropDuplicatesWithinWatermark(["guid"])
 {% endhighlight %}
 
 </div>
diff --git a/python/pyspark/sql/classic/dataframe.py 
b/python/pyspark/sql/classic/dataframe.py
index fb2bb3c22703..0e890e3343e6 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -1222,23 +1222,17 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
     def subtract(self, other: ParentDataFrame) -> ParentDataFrame:
         return DataFrame(getattr(self._jdf, "except")(other._jdf), 
self.sparkSession)
 
-    def dropDuplicates(self, *subset: Union[str, List[str]]) -> 
ParentDataFrame:
-        # Acceptable args should be str, ... or a single List[str]
-        # So if subset length is 1, it can be either single str, or a list of 
str
-        # if subset length is greater than 1, it must be a sequence of str
-        if not subset:
+    def dropDuplicates(self, subset: Optional[List[str]] = None) -> 
ParentDataFrame:
+        if subset is not None and (not isinstance(subset, Iterable) or 
isinstance(subset, str)):
+            raise PySparkTypeError(
+                errorClass="NOT_LIST_OR_TUPLE",
+                messageParameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
+            )
+
+        if subset is None:
             jdf = self._jdf.dropDuplicates()
-        elif len(subset) == 1 and isinstance(subset[0], list):
-            item = subset[0]
-            for c in item:
-                if not isinstance(c, str):
-                    raise PySparkTypeError(
-                        errorClass="NOT_STR",
-                        messageParameters={"arg_name": "subset", "arg_type": 
type(c).__name__},
-                    )
-            jdf = self._jdf.dropDuplicates(self._jseq(item))
         else:
-            for c in subset:  # type: ignore[assignment]
+            for c in subset:
                 if not isinstance(c, str):
                     raise PySparkTypeError(
                         errorClass="NOT_STR",
@@ -1247,20 +1241,22 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
             jdf = self._jdf.dropDuplicates(self._jseq(subset))
         return DataFrame(jdf, self.sparkSession)
 
-    drop_duplicates = dropDuplicates
-
-    def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) -> 
ParentDataFrame:
-        # Acceptable args should be str, ... or a single List[str]
-        # So if subset length is 1, it can be either single str, or a list of 
str
-        # if subset length is greater than 1, it must be a sequence of str
-        if len(subset) > 1:
-            assert all(isinstance(c, str) for c in subset)
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> ParentDataFrame:
+        if subset is not None and (not isinstance(subset, Iterable) or 
isinstance(subset, str)):
+            raise PySparkTypeError(
+                errorClass="NOT_LIST_OR_TUPLE",
+                messageParameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
+            )
 
-        if not subset:
+        if subset is None:
             jdf = self._jdf.dropDuplicatesWithinWatermark()
-        elif len(subset) == 1 and isinstance(subset[0], list):
-            jdf = 
self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset[0]))
         else:
+            for c in subset:
+                if not isinstance(c, str):
+                    raise PySparkTypeError(
+                        errorClass="NOT_STR",
+                        messageParameters={"arg_name": "subset", "arg_type": 
type(c).__name__},
+                    )
             jdf = self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset))
         return DataFrame(jdf, self.sparkSession)
 
@@ -1805,6 +1801,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
     def groupby(self, *cols: "ColumnOrNameOrOrdinal") -> "GroupedData":  # 
type: ignore[misc]
         return self.groupBy(*cols)
 
+    def drop_duplicates(self, subset: Optional[List[str]] = None) -> 
ParentDataFrame:
+        return self.dropDuplicates(subset)
+
     def writeTo(self, table: str) -> DataFrameWriterV2:
         return DataFrameWriterV2(self, table)
 
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 846f1109a92d..442157eef0b7 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -426,40 +426,29 @@ class DataFrame(ParentDataFrame):
                     "arg_type": type(numPartitions).__name__,
                 },
             )
-
         res._cached_schema = self._cached_schema
         return res
 
-    def dropDuplicates(self, *subset: Union[str, List[str]]) -> 
ParentDataFrame:
-        # Acceptable args should be str, ... or a single List[str]
-        # So if subset length is 1, it can be either single str, or a list of 
str
-        # if subset length is greater than 1, it must be a sequence of str
-        if not subset:
-            res = DataFrame(
-                plan.Deduplicate(child=self._plan, all_columns_as_keys=True), 
session=self._session
+    def dropDuplicates(self, subset: Optional[List[str]] = None) -> 
ParentDataFrame:
+        if subset is not None and not isinstance(subset, (list, tuple)):
+            raise PySparkTypeError(
+                errorClass="NOT_LIST_OR_TUPLE",
+                messageParameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
             )
-        elif len(subset) == 1 and isinstance(subset[0], list):
-            item = subset[0]
-            for c in item:
-                if not isinstance(c, str):
-                    raise PySparkTypeError(
-                        errorClass="NOT_STR",
-                        messageParameters={"arg_name": "subset", "arg_type": 
type(c).__name__},
-                    )
+
+        if subset is None:
             res = DataFrame(
-                plan.Deduplicate(child=self._plan, column_names=item),
-                session=self._session,
+                plan.Deduplicate(child=self._plan, all_columns_as_keys=True), 
session=self._session
             )
         else:
-            for c in subset:  # type: ignore[assignment]
+            for c in subset:
                 if not isinstance(c, str):
                     raise PySparkTypeError(
                         errorClass="NOT_STR",
                         messageParameters={"arg_name": "subset", "arg_type": 
type(c).__name__},
                     )
             res = DataFrame(
-                plan.Deduplicate(child=self._plan, 
column_names=cast(List[str], subset)),
-                session=self._session,
+                plan.Deduplicate(child=self._plan, column_names=subset), 
session=self._session
             )
 
         res._cached_schema = self._cached_schema
@@ -467,30 +456,27 @@ class DataFrame(ParentDataFrame):
 
     drop_duplicates = dropDuplicates
 
-    def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) -> 
ParentDataFrame:
-        # Acceptable args should be str, ... or a single List[str]
-        # So if subset length is 1, it can be either single str, or a list of 
str
-        # if subset length is greater than 1, it must be a sequence of str
-        if len(subset) > 1:
-            assert all(isinstance(c, str) for c in subset)
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> ParentDataFrame:
+        if subset is not None and not isinstance(subset, (list, tuple)):
+            raise PySparkTypeError(
+                errorClass="NOT_LIST_OR_TUPLE",
+                messageParameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
+            )
 
-        if not subset:
+        if subset is None:
             return DataFrame(
                 plan.Deduplicate(child=self._plan, all_columns_as_keys=True, 
within_watermark=True),
                 session=self._session,
             )
-        elif len(subset) == 1 and isinstance(subset[0], list):
-            return DataFrame(
-                plan.Deduplicate(child=self._plan, column_names=subset[0], 
within_watermark=True),
-                session=self._session,
-            )
         else:
+            for c in subset:
+                if not isinstance(c, str):
+                    raise PySparkTypeError(
+                        errorClass="NOT_STR",
+                        messageParameters={"arg_name": "subset", "arg_type": 
type(c).__name__},
+                    )
             return DataFrame(
-                plan.Deduplicate(
-                    child=self._plan,
-                    column_names=cast(List[str], subset),
-                    within_watermark=True,
-                ),
+                plan.Deduplicate(child=self._plan, column_names=subset, 
within_watermark=True),
                 session=self._session,
             )
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index b9c60c04d0f0..958626280e41 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -686,7 +686,7 @@ class Deduplicate(LogicalPlan):
         self,
         child: Optional["LogicalPlan"],
         all_columns_as_keys: bool = False,
-        column_names: Optional[Sequence[str]] = None,
+        column_names: Optional[List[str]] = None,
         within_watermark: bool = False,
     ) -> None:
         super().__init__(child)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 6f1afaba5f98..7d3900c7afbc 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -4570,7 +4570,7 @@ class DataFrame:
         ...
 
     @dispatch_df_method
-    def dropDuplicates(self, *subset: Union[str, List[str]]) -> "DataFrame":
+    def dropDuplicates(self, subset: Optional[List[str]] = None) -> 
"DataFrame":
         """Return a new :class:`DataFrame` with duplicate rows removed,
         optionally only considering certain columns.
 
@@ -4587,9 +4587,6 @@ class DataFrame:
         .. versionchanged:: 3.4.0
             Supports Spark Connect.
 
-        .. versionchanged:: 4.0.0
-            Supports variable-length argument
-
         Parameters
         ----------
         subset : list of column names, optional
@@ -4621,7 +4618,7 @@ class DataFrame:
 
         Deduplicate values on 'name' and 'height' columns.
 
-        >>> df.dropDuplicates('name', 'height').show()
+        >>> df.dropDuplicates(['name', 'height']).show()
         +-----+---+------+
         | name|age|height|
         +-----+---+------+
@@ -4631,7 +4628,7 @@ class DataFrame:
         ...
 
     @dispatch_df_method
-    def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) -> 
"DataFrame":
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
         """Return a new :class:`DataFrame` with duplicate rows removed,
          optionally only considering certain columns, within watermark.
 
@@ -4648,9 +4645,6 @@ class DataFrame:
 
          .. versionadded:: 3.5.0
 
-         .. versionchanged:: 4.0.0
-            Supports variable-length argument
-
          Parameters
          ----------
          subset : List of column names, optional
@@ -4680,7 +4674,7 @@ class DataFrame:
 
          Deduplicate values on 'value' columns.
 
-         >>> df.dropDuplicatesWithinWatermark('value')  # doctest: +SKIP
+         >>> df.dropDuplicatesWithinWatermark(['value'])  # doctest: +SKIP
         """
         ...
 
@@ -5937,17 +5931,11 @@ class DataFrame:
         ...
 
     @dispatch_df_method
-    def drop_duplicates(self, *subset: Union[str, List[str]]) -> "DataFrame":
+    def drop_duplicates(self, subset: Optional[List[str]] = None) -> 
"DataFrame":
         """
         :func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
 
         .. versionadded:: 1.4.0
-
-        .. versionchanged:: 3.4.0
-            Supports Spark Connect
-
-        .. versionchanged:: 4.0.0
-            Supports variable-length argument
         """
         ...
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index f084601d2e7b..f0637056ab8f 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -670,18 +670,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         self.assert_eq(
             df.dropDuplicates(["name"]).toPandas(), 
df2.dropDuplicates(["name"]).toPandas()
         )
-        self.assert_eq(
-            df.drop_duplicates(["name"]).toPandas(), 
df2.drop_duplicates(["name"]).toPandas()
-        )
-        self.assert_eq(
-            df.dropDuplicates(["name", "id"]).toPandas(),
-            df2.dropDuplicates(["name", "id"]).toPandas(),
-        )
-        self.assert_eq(
-            df.drop_duplicates(["name", "id"]).toPandas(),
-            df2.drop_duplicates(["name", "id"]).toPandas(),
-        )
-        self.assert_eq(df.dropDuplicates("name").toPandas(), 
df2.dropDuplicates("name").toPandas())
 
     def test_drop(self):
         # SPARK-41169: test drop
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan.py 
b/python/pyspark/sql/tests/connect/test_connect_plan.py
index 1b373b2e1944..a03cd30c733f 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan.py
@@ -553,25 +553,13 @@ class SparkConnectPlanTests(PlanOnlyTestFixture):
         
self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys,
 True)
         
self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names),
 0)
 
-        deduplicate_on_subset_columns_plan_list_arg = df.dropDuplicates(
-            ["name", "height"]
-        )._plan.to_proto(self.connect)
-        self.assertEqual(
-            
deduplicate_on_subset_columns_plan_list_arg.root.deduplicate.all_columns_as_keys,
 False
-        )
-        self.assertEqual(
-            
len(deduplicate_on_subset_columns_plan_list_arg.root.deduplicate.column_names), 
2
-        )
-
-        deduplicate_on_subset_columns_plan_var_arg = df.dropDuplicates(
-            "name", "height"
-        )._plan.to_proto(self.connect)
-        self.assertEqual(
-            
deduplicate_on_subset_columns_plan_var_arg.root.deduplicate.all_columns_as_keys,
 False
+        deduplicate_on_subset_columns_plan = df.dropDuplicates(["name", 
"height"])._plan.to_proto(
+            self.connect
         )
         self.assertEqual(
-            
len(deduplicate_on_subset_columns_plan_var_arg.root.deduplicate.column_names), 2
+            
deduplicate_on_subset_columns_plan.root.deduplicate.all_columns_as_keys, False
         )
+        
self.assertEqual(len(deduplicate_on_subset_columns_plan.root.deduplicate.column_names),
 2)
 
     def test_relation_alias(self):
         df = self.connect.readTable(table_name=self.tbl_name)
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 4e2d3b9ba42a..a214b874f5ec 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -266,35 +266,28 @@ class DataFrameTestsMixin:
         self.assertEqual(df2.columns, ["a"])
 
     def test_drop_duplicates(self):
+        # SPARK-36034 test that drop duplicates throws a type error when in 
correct type provided
         df = self.spark.createDataFrame([("Alice", 50), ("Alice", 60)], 
["name", "age"])
 
         # shouldn't drop a non-null row
         self.assertEqual(df.dropDuplicates().count(), 2)
 
         self.assertEqual(df.dropDuplicates(["name"]).count(), 1)
-        self.assertEqual(df.dropDuplicates(["name", "age"]).count(), 2)
-
-        self.assertEqual(df.drop_duplicates(["name"]).count(), 1)
-        self.assertEqual(df.drop_duplicates(["name", "age"]).count(), 2)
 
-        # SPARK-48482 dropDuplicates should take varargs
-        self.assertEqual(df.dropDuplicates("name").count(), 1)
-        self.assertEqual(df.dropDuplicates("name", "age").count(), 2)
-        self.assertEqual(df.drop_duplicates("name").count(), 1)
-        self.assertEqual(df.drop_duplicates("name", "age").count(), 2)
+        self.assertEqual(df.dropDuplicates(["name", "age"]).count(), 2)
 
-        # Should raise proper error when taking non-string values
         with self.assertRaises(PySparkTypeError) as pe:
-            df.dropDuplicates([None]).show()
+            df.dropDuplicates("name")
 
         self.check_error(
             exception=pe.exception,
-            errorClass="NOT_STR",
-            messageParameters={"arg_name": "subset", "arg_type": "NoneType"},
+            errorClass="NOT_LIST_OR_TUPLE",
+            messageParameters={"arg_name": "subset", "arg_type": "str"},
         )
 
+        # Should raise proper error when taking non-string values
         with self.assertRaises(PySparkTypeError) as pe:
-            df.dropDuplicates(None).show()
+            df.dropDuplicates([None]).show()
 
         self.check_error(
             exception=pe.exception,
@@ -311,15 +304,6 @@ class DataFrameTestsMixin:
             messageParameters={"arg_name": "subset", "arg_type": "int"},
         )
 
-        with self.assertRaises(PySparkTypeError) as pe:
-            df.dropDuplicates(1).show()
-
-        self.check_error(
-            exception=pe.exception,
-            errorClass="NOT_STR",
-            messageParameters={"arg_name": "subset", "arg_type": "int"},
-        )
-
     def test_drop_duplicates_with_ambiguous_reference(self):
         df1 = self.spark.createDataFrame([(14, "Tom"), (23, "Alice"), (16, 
"Bob")], ["age", "name"])
         df2 = self.spark.createDataFrame([Row(height=80, name="Tom"), 
Row(height=85, name="Bob")])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to