This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ebe635eb1d44 [SPARK-48482][PYTHON][FOLLOWUP] Revert dropDuplicates and
dropDuplicatesWIthinWatermark should accept variable length args
ebe635eb1d44 is described below
commit ebe635eb1d44dee879623e8646bd3be7424b5676
Author: Wei Liu <[email protected]>
AuthorDate: Fri Aug 30 09:13:58 2024 +0900
[SPARK-48482][PYTHON][FOLLOWUP] Revert dropDuplicates and
dropDuplicatesWIthinWatermark should accept variable length args
### What changes were proposed in this pull request?
Per conversation from
https://github.com/apache/spark/pull/47835#issuecomment-2311082085, we will
revert 560c08332b35941260169124b4f522bdc82b84d8 for API parity with Pandas API
### Why are the changes needed?
Bug fix
### Does this PR introduce _any_ user-facing change?
Yes, reverting the API would reenable user to use
`dropDuplicates(subset=xxx)`
### How was this patch tested?
Unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47916 from WweiL/revert-dropDuplicates-api.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
docs/structured-streaming-programming-guide.md | 6 +--
python/pyspark/sql/classic/dataframe.py | 51 +++++++++---------
python/pyspark/sql/connect/dataframe.py | 62 +++++++++-------------
python/pyspark/sql/connect/plan.py | 2 +-
python/pyspark/sql/dataframe.py | 22 ++------
.../sql/tests/connect/test_connect_basic.py | 12 -----
.../pyspark/sql/tests/connect/test_connect_plan.py | 20 ++-----
python/pyspark/sql/tests/test_dataframe.py | 30 +++--------
8 files changed, 69 insertions(+), 136 deletions(-)
diff --git a/docs/structured-streaming-programming-guide.md
b/docs/structured-streaming-programming-guide.md
index aa20da6ae81d..d266e761263b 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2082,12 +2082,12 @@ You can deduplicate records in data streams using a
unique identifier in the eve
streamingDf = spark.readStream. ...
# Without watermark using guid column
-streamingDf.dropDuplicates("guid")
+streamingDf.dropDuplicates(["guid"])
# With watermark using guid and eventTime columns
streamingDf \
.withWatermark("eventTime", "10 seconds") \
- .dropDuplicates("guid", "eventTime")
+ .dropDuplicates(["guid", "eventTime"])
{% endhighlight %}
</div>
@@ -2163,7 +2163,7 @@ streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
streamingDf \
.withWatermark("eventTime", "10 hours") \
- .dropDuplicatesWithinWatermark("guid")
+ .dropDuplicatesWithinWatermark(["guid"])
{% endhighlight %}
</div>
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index fb2bb3c22703..0e890e3343e6 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -1222,23 +1222,17 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def subtract(self, other: ParentDataFrame) -> ParentDataFrame:
return DataFrame(getattr(self._jdf, "except")(other._jdf),
self.sparkSession)
- def dropDuplicates(self, *subset: Union[str, List[str]]) ->
ParentDataFrame:
- # Acceptable args should be str, ... or a single List[str]
- # So if subset length is 1, it can be either single str, or a list of
str
- # if subset length is greater than 1, it must be a sequence of str
- if not subset:
+ def dropDuplicates(self, subset: Optional[List[str]] = None) ->
ParentDataFrame:
+ if subset is not None and (not isinstance(subset, Iterable) or
isinstance(subset, str)):
+ raise PySparkTypeError(
+ errorClass="NOT_LIST_OR_TUPLE",
+ messageParameters={"arg_name": "subset", "arg_type":
type(subset).__name__},
+ )
+
+ if subset is None:
jdf = self._jdf.dropDuplicates()
- elif len(subset) == 1 and isinstance(subset[0], list):
- item = subset[0]
- for c in item:
- if not isinstance(c, str):
- raise PySparkTypeError(
- errorClass="NOT_STR",
- messageParameters={"arg_name": "subset", "arg_type":
type(c).__name__},
- )
- jdf = self._jdf.dropDuplicates(self._jseq(item))
else:
- for c in subset: # type: ignore[assignment]
+ for c in subset:
if not isinstance(c, str):
raise PySparkTypeError(
errorClass="NOT_STR",
@@ -1247,20 +1241,22 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
- drop_duplicates = dropDuplicates
-
- def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) ->
ParentDataFrame:
- # Acceptable args should be str, ... or a single List[str]
- # So if subset length is 1, it can be either single str, or a list of
str
- # if subset length is greater than 1, it must be a sequence of str
- if len(subset) > 1:
- assert all(isinstance(c, str) for c in subset)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> ParentDataFrame:
+ if subset is not None and (not isinstance(subset, Iterable) or
isinstance(subset, str)):
+ raise PySparkTypeError(
+ errorClass="NOT_LIST_OR_TUPLE",
+ messageParameters={"arg_name": "subset", "arg_type":
type(subset).__name__},
+ )
- if not subset:
+ if subset is None:
jdf = self._jdf.dropDuplicatesWithinWatermark()
- elif len(subset) == 1 and isinstance(subset[0], list):
- jdf =
self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset[0]))
else:
+ for c in subset:
+ if not isinstance(c, str):
+ raise PySparkTypeError(
+ errorClass="NOT_STR",
+ messageParameters={"arg_name": "subset", "arg_type":
type(c).__name__},
+ )
jdf = self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)
@@ -1805,6 +1801,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def groupby(self, *cols: "ColumnOrNameOrOrdinal") -> "GroupedData": #
type: ignore[misc]
return self.groupBy(*cols)
+ def drop_duplicates(self, subset: Optional[List[str]] = None) ->
ParentDataFrame:
+ return self.dropDuplicates(subset)
+
def writeTo(self, table: str) -> DataFrameWriterV2:
return DataFrameWriterV2(self, table)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 846f1109a92d..442157eef0b7 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -426,40 +426,29 @@ class DataFrame(ParentDataFrame):
"arg_type": type(numPartitions).__name__,
},
)
-
res._cached_schema = self._cached_schema
return res
- def dropDuplicates(self, *subset: Union[str, List[str]]) ->
ParentDataFrame:
- # Acceptable args should be str, ... or a single List[str]
- # So if subset length is 1, it can be either single str, or a list of
str
- # if subset length is greater than 1, it must be a sequence of str
- if not subset:
- res = DataFrame(
- plan.Deduplicate(child=self._plan, all_columns_as_keys=True),
session=self._session
+ def dropDuplicates(self, subset: Optional[List[str]] = None) ->
ParentDataFrame:
+ if subset is not None and not isinstance(subset, (list, tuple)):
+ raise PySparkTypeError(
+ errorClass="NOT_LIST_OR_TUPLE",
+ messageParameters={"arg_name": "subset", "arg_type":
type(subset).__name__},
)
- elif len(subset) == 1 and isinstance(subset[0], list):
- item = subset[0]
- for c in item:
- if not isinstance(c, str):
- raise PySparkTypeError(
- errorClass="NOT_STR",
- messageParameters={"arg_name": "subset", "arg_type":
type(c).__name__},
- )
+
+ if subset is None:
res = DataFrame(
- plan.Deduplicate(child=self._plan, column_names=item),
- session=self._session,
+ plan.Deduplicate(child=self._plan, all_columns_as_keys=True),
session=self._session
)
else:
- for c in subset: # type: ignore[assignment]
+ for c in subset:
if not isinstance(c, str):
raise PySparkTypeError(
errorClass="NOT_STR",
messageParameters={"arg_name": "subset", "arg_type":
type(c).__name__},
)
res = DataFrame(
- plan.Deduplicate(child=self._plan,
column_names=cast(List[str], subset)),
- session=self._session,
+ plan.Deduplicate(child=self._plan, column_names=subset),
session=self._session
)
res._cached_schema = self._cached_schema
@@ -467,30 +456,27 @@ class DataFrame(ParentDataFrame):
drop_duplicates = dropDuplicates
- def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) ->
ParentDataFrame:
- # Acceptable args should be str, ... or a single List[str]
- # So if subset length is 1, it can be either single str, or a list of
str
- # if subset length is greater than 1, it must be a sequence of str
- if len(subset) > 1:
- assert all(isinstance(c, str) for c in subset)
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> ParentDataFrame:
+ if subset is not None and not isinstance(subset, (list, tuple)):
+ raise PySparkTypeError(
+ errorClass="NOT_LIST_OR_TUPLE",
+ messageParameters={"arg_name": "subset", "arg_type":
type(subset).__name__},
+ )
- if not subset:
+ if subset is None:
return DataFrame(
plan.Deduplicate(child=self._plan, all_columns_as_keys=True,
within_watermark=True),
session=self._session,
)
- elif len(subset) == 1 and isinstance(subset[0], list):
- return DataFrame(
- plan.Deduplicate(child=self._plan, column_names=subset[0],
within_watermark=True),
- session=self._session,
- )
else:
+ for c in subset:
+ if not isinstance(c, str):
+ raise PySparkTypeError(
+ errorClass="NOT_STR",
+ messageParameters={"arg_name": "subset", "arg_type":
type(c).__name__},
+ )
return DataFrame(
- plan.Deduplicate(
- child=self._plan,
- column_names=cast(List[str], subset),
- within_watermark=True,
- ),
+ plan.Deduplicate(child=self._plan, column_names=subset,
within_watermark=True),
session=self._session,
)
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index b9c60c04d0f0..958626280e41 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -686,7 +686,7 @@ class Deduplicate(LogicalPlan):
self,
child: Optional["LogicalPlan"],
all_columns_as_keys: bool = False,
- column_names: Optional[Sequence[str]] = None,
+ column_names: Optional[List[str]] = None,
within_watermark: bool = False,
) -> None:
super().__init__(child)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 6f1afaba5f98..7d3900c7afbc 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -4570,7 +4570,7 @@ class DataFrame:
...
@dispatch_df_method
- def dropDuplicates(self, *subset: Union[str, List[str]]) -> "DataFrame":
+ def dropDuplicates(self, subset: Optional[List[str]] = None) ->
"DataFrame":
"""Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns.
@@ -4587,9 +4587,6 @@ class DataFrame:
.. versionchanged:: 3.4.0
Supports Spark Connect.
- .. versionchanged:: 4.0.0
- Supports variable-length argument
-
Parameters
----------
subset : list of column names, optional
@@ -4621,7 +4618,7 @@ class DataFrame:
Deduplicate values on 'name' and 'height' columns.
- >>> df.dropDuplicates('name', 'height').show()
+ >>> df.dropDuplicates(['name', 'height']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
@@ -4631,7 +4628,7 @@ class DataFrame:
...
@dispatch_df_method
- def dropDuplicatesWithinWatermark(self, *subset: Union[str, List[str]]) ->
"DataFrame":
+ def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
"""Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns, within watermark.
@@ -4648,9 +4645,6 @@ class DataFrame:
.. versionadded:: 3.5.0
- .. versionchanged:: 4.0.0
- Supports variable-length argument
-
Parameters
----------
subset : List of column names, optional
@@ -4680,7 +4674,7 @@ class DataFrame:
Deduplicate values on 'value' columns.
- >>> df.dropDuplicatesWithinWatermark('value') # doctest: +SKIP
+ >>> df.dropDuplicatesWithinWatermark(['value']) # doctest: +SKIP
"""
...
@@ -5937,17 +5931,11 @@ class DataFrame:
...
@dispatch_df_method
- def drop_duplicates(self, *subset: Union[str, List[str]]) -> "DataFrame":
+ def drop_duplicates(self, subset: Optional[List[str]] = None) ->
"DataFrame":
"""
:func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
.. versionadded:: 1.4.0
-
- .. versionchanged:: 3.4.0
- Supports Spark Connect
-
- .. versionchanged:: 4.0.0
- Supports variable-length argument
"""
...
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index f084601d2e7b..f0637056ab8f 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -670,18 +670,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.assert_eq(
df.dropDuplicates(["name"]).toPandas(),
df2.dropDuplicates(["name"]).toPandas()
)
- self.assert_eq(
- df.drop_duplicates(["name"]).toPandas(),
df2.drop_duplicates(["name"]).toPandas()
- )
- self.assert_eq(
- df.dropDuplicates(["name", "id"]).toPandas(),
- df2.dropDuplicates(["name", "id"]).toPandas(),
- )
- self.assert_eq(
- df.drop_duplicates(["name", "id"]).toPandas(),
- df2.drop_duplicates(["name", "id"]).toPandas(),
- )
- self.assert_eq(df.dropDuplicates("name").toPandas(),
df2.dropDuplicates("name").toPandas())
def test_drop(self):
# SPARK-41169: test drop
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan.py
b/python/pyspark/sql/tests/connect/test_connect_plan.py
index 1b373b2e1944..a03cd30c733f 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan.py
@@ -553,25 +553,13 @@ class SparkConnectPlanTests(PlanOnlyTestFixture):
self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys,
True)
self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names),
0)
- deduplicate_on_subset_columns_plan_list_arg = df.dropDuplicates(
- ["name", "height"]
- )._plan.to_proto(self.connect)
- self.assertEqual(
-
deduplicate_on_subset_columns_plan_list_arg.root.deduplicate.all_columns_as_keys,
False
- )
- self.assertEqual(
-
len(deduplicate_on_subset_columns_plan_list_arg.root.deduplicate.column_names),
2
- )
-
- deduplicate_on_subset_columns_plan_var_arg = df.dropDuplicates(
- "name", "height"
- )._plan.to_proto(self.connect)
- self.assertEqual(
-
deduplicate_on_subset_columns_plan_var_arg.root.deduplicate.all_columns_as_keys,
False
+ deduplicate_on_subset_columns_plan = df.dropDuplicates(["name",
"height"])._plan.to_proto(
+ self.connect
)
self.assertEqual(
-
len(deduplicate_on_subset_columns_plan_var_arg.root.deduplicate.column_names), 2
+
deduplicate_on_subset_columns_plan.root.deduplicate.all_columns_as_keys, False
)
+
self.assertEqual(len(deduplicate_on_subset_columns_plan.root.deduplicate.column_names),
2)
def test_relation_alias(self):
df = self.connect.readTable(table_name=self.tbl_name)
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index 4e2d3b9ba42a..a214b874f5ec 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -266,35 +266,28 @@ class DataFrameTestsMixin:
self.assertEqual(df2.columns, ["a"])
def test_drop_duplicates(self):
+ # SPARK-36034 test that drop duplicates throws a type error when in
correct type provided
df = self.spark.createDataFrame([("Alice", 50), ("Alice", 60)],
["name", "age"])
# shouldn't drop a non-null row
self.assertEqual(df.dropDuplicates().count(), 2)
self.assertEqual(df.dropDuplicates(["name"]).count(), 1)
- self.assertEqual(df.dropDuplicates(["name", "age"]).count(), 2)
-
- self.assertEqual(df.drop_duplicates(["name"]).count(), 1)
- self.assertEqual(df.drop_duplicates(["name", "age"]).count(), 2)
- # SPARK-48482 dropDuplicates should take varargs
- self.assertEqual(df.dropDuplicates("name").count(), 1)
- self.assertEqual(df.dropDuplicates("name", "age").count(), 2)
- self.assertEqual(df.drop_duplicates("name").count(), 1)
- self.assertEqual(df.drop_duplicates("name", "age").count(), 2)
+ self.assertEqual(df.dropDuplicates(["name", "age"]).count(), 2)
- # Should raise proper error when taking non-string values
with self.assertRaises(PySparkTypeError) as pe:
- df.dropDuplicates([None]).show()
+ df.dropDuplicates("name")
self.check_error(
exception=pe.exception,
- errorClass="NOT_STR",
- messageParameters={"arg_name": "subset", "arg_type": "NoneType"},
+ errorClass="NOT_LIST_OR_TUPLE",
+ messageParameters={"arg_name": "subset", "arg_type": "str"},
)
+ # Should raise proper error when taking non-string values
with self.assertRaises(PySparkTypeError) as pe:
- df.dropDuplicates(None).show()
+ df.dropDuplicates([None]).show()
self.check_error(
exception=pe.exception,
@@ -311,15 +304,6 @@ class DataFrameTestsMixin:
messageParameters={"arg_name": "subset", "arg_type": "int"},
)
- with self.assertRaises(PySparkTypeError) as pe:
- df.dropDuplicates(1).show()
-
- self.check_error(
- exception=pe.exception,
- errorClass="NOT_STR",
- messageParameters={"arg_name": "subset", "arg_type": "int"},
- )
-
def test_drop_duplicates_with_ambiguous_reference(self):
df1 = self.spark.createDataFrame([(14, "Tom"), (23, "Alice"), (16,
"Bob")], ["age", "name"])
df2 = self.spark.createDataFrame([Row(height=80, name="Tom"),
Row(height=85, name="Bob")])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]