This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4d765114d6e [SPARK-43046][SS][CONNECT] Implemented Python API
dropDuplicatesWithinWatermark for Spark Connect
4d765114d6e is described below
commit 4d765114d6e5dd1a78a7ad798750e7bc400a72a6
Author: bogao007 <[email protected]>
AuthorDate: Sat Apr 22 09:21:16 2023 +0900
[SPARK-43046][SS][CONNECT] Implemented Python API
dropDuplicatesWithinWatermark for Spark Connect
### What changes were proposed in this pull request?
Implemented `dropDuplicatesWithinWatermark` Python API for Spark Connect.
This change is based on a previous
[commit](https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103)
that introduced `dropDuplicatesWithinWatermark` API in Spark.
### Why are the changes needed?
We recently introduced dropDuplicatesWithinWatermark API in Spark ([commit
link](https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103)).
We want to bring parity to the Spark Connect.
### Does this PR introduce _any_ user-facing change?
Yes, this introduces a new public API, dropDuplicatesWithinWatermark in
Spark Connect.
### How was this patch tested?
Added new test cases in test suites.
Closes #40834 from bogao007/drop-dup-watermark.
Authored-by: bogao007 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/protobuf/spark/connect/relations.proto | 3 +
.../org/apache/spark/sql/connect/dsl/package.scala | 11 ++
.../sql/connect/planner/SparkConnectPlanner.scala | 8 +-
.../connect/planner/SparkConnectPlannerSuite.scala | 30 ++++
.../connect/planner/SparkConnectProtoSuite.scala | 10 ++
python/pyspark/sql/connect/dataframe.py | 22 ++-
python/pyspark/sql/connect/plan.py | 3 +
python/pyspark/sql/connect/proto/relations_pb2.py | 152 ++++++++++-----------
python/pyspark/sql/connect/proto/relations_pb2.pyi | 17 +++
.../sql/tests/connect/test_connect_basic.py | 10 +-
10 files changed, 183 insertions(+), 83 deletions(-)
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 57bdf57c9cb..29e701a42cf 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -362,6 +362,9 @@ message Deduplicate {
//
// This field does not co-use with `column_names`.
optional bool all_columns_as_keys = 3;
+
+ // (Optional) Deduplicate within the time range of watermark.
+ optional bool within_watermark = 4;
}
// A relation that does not need to be qualified by name.
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 21b9180ccfb..25d722cf58d 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -596,6 +596,17 @@ package object dsl {
.addAllColumnNames(colNames.asJava))
.build()
+ def deduplicateWithinWatermark(colNames: Seq[String]): Relation =
+ Relation
+ .newBuilder()
+ .setDeduplicate(
+ Deduplicate
+ .newBuilder()
+ .setInput(logicalPlan)
+ .addAllColumnNames(colNames.asJava)
+ .setWithinWatermark(true))
+ .build()
+
def distinct(): Relation =
Relation
.newBuilder()
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index ef502e282ee..ba394396077 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser,
ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType,
LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics,
CommandResult, Deduplicate, DeserializeToObject, Except, Intersect,
LocalRelation, LogicalPlan, MapPartitions, Project, Sample,
SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot,
UnresolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics,
CommandResult, Deduplicate, DeduplicateWithinWatermark, DeserializeToObject,
Except, Intersect, LocalRelation, LogicalPlan, MapPartitions, Project, Sample,
SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, Unpivot,
UnresolvedHint}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
InvalidPlanInput, LiteralValueProtoConverter, StorageLevelProtoConverter,
UdfPacket}
@@ -738,7 +738,8 @@ class SparkConnectPlanner(val session: SparkSession) {
val resolver = session.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
if (rel.getAllColumnsAsKeys) {
- Deduplicate(allColumns, queryExecution.analyzed)
+ if (rel.getWithinWatermark) DeduplicateWithinWatermark(allColumns,
queryExecution.analyzed)
+ else Deduplicate(allColumns, queryExecution.analyzed)
} else {
val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
val groupCols = toGroupColumnNames.flatMap { (colName: String) =>
@@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) {
}
cols
}
- Deduplicate(groupCols, queryExecution.analyzed)
+ if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols,
queryExecution.analyzed)
+ else Deduplicate(groupCols, queryExecution.analyzed)
}
}
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index ec2362d5a56..8dac0b166b6 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -381,6 +381,36 @@ class SparkConnectPlannerSuite extends SparkFunSuite with
SparkConnectPlanTest {
assert(e2.getMessage.contains("either deduplicate on all columns or a
subset of columns"))
}
+ test("Test invalid deduplicateWithinWatermark") {
+ val deduplicateWithinWatermark = proto.Deduplicate
+ .newBuilder()
+ .setInput(readRel)
+ .setAllColumnsAsKeys(true)
+ .addColumnNames("test")
+ .setWithinWatermark(true)
+
+ val e = intercept[InvalidPlanInput] {
+ transform(
+ proto.Relation.newBuilder
+ .setDeduplicate(deduplicateWithinWatermark)
+ .build())
+ }
+ assert(
+ e.getMessage.contains("Cannot deduplicate on both all columns and a
subset of columns"))
+
+ val deduplicateWithinWatermark2 = proto.Deduplicate
+ .newBuilder()
+ .setInput(readRel)
+ .setWithinWatermark(true)
+ val e2 = intercept[InvalidPlanInput] {
+ transform(
+ proto.Relation.newBuilder
+ .setDeduplicate(deduplicateWithinWatermark2)
+ .build())
+ }
+ assert(e2.getMessage.contains("either deduplicate on all columns or a
subset of columns"))
+ }
+
test("Test invalid intersect, except") {
// Except with union_by_name=true
val except = proto.SetOperation
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 824ee7aceb4..96dae647db6 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -348,6 +348,16 @@ class SparkConnectProtoSuite extends PlanTest with
SparkConnectPlanTest {
comparePlans(connectPlan2, sparkPlan2)
}
+ test("Test basic deduplicateWithinWatermark") {
+ val connectPlan = connectTestRelation.distinct()
+ val sparkPlan = sparkTestRelation.distinct()
+ comparePlans(connectPlan, sparkPlan)
+
+ val connectPlan2 =
connectTestRelation.deduplicateWithinWatermark(Seq("id", "name"))
+ val sparkPlan2 = sparkTestRelation.dropDuplicatesWithinWatermark(Seq("id",
"name"))
+ comparePlans(connectPlan2, sparkPlan2)
+ }
+
test("Test union, except, intersect") {
val connectPlan1 = connectTestRelation.except(connectTestRelation, isAll =
false)
val sparkPlan1 = sparkTestRelation.except(sparkTestRelation)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 87eb6df5cba..3b39b82196e 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -379,7 +379,26 @@ class DataFrame:
drop_duplicates = dropDuplicates
def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] =
None) -> "DataFrame":
- raise NotImplementedError("dropDuplicatesWithinWatermark() is not
implemented.")
+ if subset is not None and not isinstance(subset, (list, tuple)):
+ raise PySparkTypeError(
+ error_class="NOT_LIST_OR_TUPLE",
+ message_parameters={"arg_name": "subset", "arg_type":
type(subset).__name__},
+ )
+
+ if subset is None:
+ return DataFrame.withPlan(
+ plan.Deduplicate(child=self._plan, all_columns_as_keys=True,
within_watermark=True),
+ session=self._session,
+ )
+ else:
+ return DataFrame.withPlan(
+ plan.Deduplicate(child=self._plan, column_names=subset,
within_watermark=True),
+ session=self._session,
+ )
+
+ dropDuplicatesWithinWatermark.__doc__ =
PySparkDataFrame.dropDuplicatesWithinWatermark.__doc__
+
+ drop_duplicates_within_watermark = dropDuplicatesWithinWatermark
def distinct(self) -> "DataFrame":
return DataFrame.withPlan(
@@ -595,7 +614,6 @@ class DataFrame:
fraction: Optional[Union[int, float]] = None,
seed: Optional[int] = None,
) -> "DataFrame":
-
# For the cases below:
# sample(True, 0.5 [, seed])
# sample(True, fraction=0.5 [, seed])
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index 9e221814f12..8b6dda0b1ca 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -609,16 +609,19 @@ class Deduplicate(LogicalPlan):
child: Optional["LogicalPlan"],
all_columns_as_keys: bool = False,
column_names: Optional[List[str]] = None,
+ within_watermark: bool = False,
) -> None:
super().__init__(child)
self.all_columns_as_keys = all_columns_as_keys
self.column_names = column_names
+ self.within_watermark = within_watermark
def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
plan = self._create_proto_relation()
plan.deduplicate.input.CopyFrom(self._child.plan(session))
plan.deduplicate.all_columns_as_keys = self.all_columns_as_keys
+ plan.deduplicate.within_watermark = self.within_watermark
if self.column_names is not None:
plan.deduplicate.column_names.extend(self.column_names)
return plan
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 6a4226185e7..b8b84f27743 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as
spark_dot_connect_dot_catal
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x99\x16\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x99\x16\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
)
@@ -806,81 +806,81 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_DROP._serialized_start = 6124
_DROP._serialized_end = 6265
_DEDUPLICATE._serialized_start = 6268
- _DEDUPLICATE._serialized_end = 6439
- _LOCALRELATION._serialized_start = 6441
- _LOCALRELATION._serialized_end = 6530
- _SAMPLE._serialized_start = 6533
- _SAMPLE._serialized_end = 6806
- _RANGE._serialized_start = 6809
- _RANGE._serialized_end = 6954
- _SUBQUERYALIAS._serialized_start = 6956
- _SUBQUERYALIAS._serialized_end = 7070
- _REPARTITION._serialized_start = 7073
- _REPARTITION._serialized_end = 7215
- _SHOWSTRING._serialized_start = 7218
- _SHOWSTRING._serialized_end = 7360
- _HTMLSTRING._serialized_start = 7362
- _HTMLSTRING._serialized_end = 7476
- _STATSUMMARY._serialized_start = 7478
- _STATSUMMARY._serialized_end = 7570
- _STATDESCRIBE._serialized_start = 7572
- _STATDESCRIBE._serialized_end = 7653
- _STATCROSSTAB._serialized_start = 7655
- _STATCROSSTAB._serialized_end = 7756
- _STATCOV._serialized_start = 7758
- _STATCOV._serialized_end = 7854
- _STATCORR._serialized_start = 7857
- _STATCORR._serialized_end = 7994
- _STATAPPROXQUANTILE._serialized_start = 7997
- _STATAPPROXQUANTILE._serialized_end = 8161
- _STATFREQITEMS._serialized_start = 8163
- _STATFREQITEMS._serialized_end = 8288
- _STATSAMPLEBY._serialized_start = 8291
- _STATSAMPLEBY._serialized_end = 8600
- _STATSAMPLEBY_FRACTION._serialized_start = 8492
- _STATSAMPLEBY_FRACTION._serialized_end = 8591
- _NAFILL._serialized_start = 8603
- _NAFILL._serialized_end = 8737
- _NADROP._serialized_start = 8740
- _NADROP._serialized_end = 8874
- _NAREPLACE._serialized_start = 8877
- _NAREPLACE._serialized_end = 9173
- _NAREPLACE_REPLACEMENT._serialized_start = 9032
- _NAREPLACE_REPLACEMENT._serialized_end = 9173
- _TODF._serialized_start = 9175
- _TODF._serialized_end = 9263
- _WITHCOLUMNSRENAMED._serialized_start = 9266
- _WITHCOLUMNSRENAMED._serialized_end = 9505
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9438
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9505
- _WITHCOLUMNS._serialized_start = 9507
- _WITHCOLUMNS._serialized_end = 9626
- _WITHWATERMARK._serialized_start = 9629
- _WITHWATERMARK._serialized_end = 9763
- _HINT._serialized_start = 9766
- _HINT._serialized_end = 9898
- _UNPIVOT._serialized_start = 9901
- _UNPIVOT._serialized_end = 10228
- _UNPIVOT_VALUES._serialized_start = 10158
- _UNPIVOT_VALUES._serialized_end = 10217
- _TOSCHEMA._serialized_start = 10230
- _TOSCHEMA._serialized_end = 10336
- _REPARTITIONBYEXPRESSION._serialized_start = 10339
- _REPARTITIONBYEXPRESSION._serialized_end = 10542
- _MAPPARTITIONS._serialized_start = 10545
- _MAPPARTITIONS._serialized_end = 10726
- _GROUPMAP._serialized_start = 10729
- _GROUPMAP._serialized_end = 10932
- _COGROUPMAP._serialized_start = 10935
- _COGROUPMAP._serialized_end = 11287
- _APPLYINPANDASWITHSTATE._serialized_start = 11290
- _APPLYINPANDASWITHSTATE._serialized_end = 11647
- _COLLECTMETRICS._serialized_start = 11650
- _COLLECTMETRICS._serialized_end = 11786
- _PARSE._serialized_start = 11789
- _PARSE._serialized_end = 12177
+ _DEDUPLICATE._serialized_end = 6508
+ _LOCALRELATION._serialized_start = 6510
+ _LOCALRELATION._serialized_end = 6599
+ _SAMPLE._serialized_start = 6602
+ _SAMPLE._serialized_end = 6875
+ _RANGE._serialized_start = 6878
+ _RANGE._serialized_end = 7023
+ _SUBQUERYALIAS._serialized_start = 7025
+ _SUBQUERYALIAS._serialized_end = 7139
+ _REPARTITION._serialized_start = 7142
+ _REPARTITION._serialized_end = 7284
+ _SHOWSTRING._serialized_start = 7287
+ _SHOWSTRING._serialized_end = 7429
+ _HTMLSTRING._serialized_start = 7431
+ _HTMLSTRING._serialized_end = 7545
+ _STATSUMMARY._serialized_start = 7547
+ _STATSUMMARY._serialized_end = 7639
+ _STATDESCRIBE._serialized_start = 7641
+ _STATDESCRIBE._serialized_end = 7722
+ _STATCROSSTAB._serialized_start = 7724
+ _STATCROSSTAB._serialized_end = 7825
+ _STATCOV._serialized_start = 7827
+ _STATCOV._serialized_end = 7923
+ _STATCORR._serialized_start = 7926
+ _STATCORR._serialized_end = 8063
+ _STATAPPROXQUANTILE._serialized_start = 8066
+ _STATAPPROXQUANTILE._serialized_end = 8230
+ _STATFREQITEMS._serialized_start = 8232
+ _STATFREQITEMS._serialized_end = 8357
+ _STATSAMPLEBY._serialized_start = 8360
+ _STATSAMPLEBY._serialized_end = 8669
+ _STATSAMPLEBY_FRACTION._serialized_start = 8561
+ _STATSAMPLEBY_FRACTION._serialized_end = 8660
+ _NAFILL._serialized_start = 8672
+ _NAFILL._serialized_end = 8806
+ _NADROP._serialized_start = 8809
+ _NADROP._serialized_end = 8943
+ _NAREPLACE._serialized_start = 8946
+ _NAREPLACE._serialized_end = 9242
+ _NAREPLACE_REPLACEMENT._serialized_start = 9101
+ _NAREPLACE_REPLACEMENT._serialized_end = 9242
+ _TODF._serialized_start = 9244
+ _TODF._serialized_end = 9332
+ _WITHCOLUMNSRENAMED._serialized_start = 9335
+ _WITHCOLUMNSRENAMED._serialized_end = 9574
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9507
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9574
+ _WITHCOLUMNS._serialized_start = 9576
+ _WITHCOLUMNS._serialized_end = 9695
+ _WITHWATERMARK._serialized_start = 9698
+ _WITHWATERMARK._serialized_end = 9832
+ _HINT._serialized_start = 9835
+ _HINT._serialized_end = 9967
+ _UNPIVOT._serialized_start = 9970
+ _UNPIVOT._serialized_end = 10297
+ _UNPIVOT_VALUES._serialized_start = 10227
+ _UNPIVOT_VALUES._serialized_end = 10286
+ _TOSCHEMA._serialized_start = 10299
+ _TOSCHEMA._serialized_end = 10405
+ _REPARTITIONBYEXPRESSION._serialized_start = 10408
+ _REPARTITIONBYEXPRESSION._serialized_end = 10611
+ _MAPPARTITIONS._serialized_start = 10614
+ _MAPPARTITIONS._serialized_end = 10795
+ _GROUPMAP._serialized_start = 10798
+ _GROUPMAP._serialized_end = 11001
+ _COGROUPMAP._serialized_start = 11004
+ _COGROUPMAP._serialized_end = 11356
+ _APPLYINPANDASWITHSTATE._serialized_start = 11359
+ _APPLYINPANDASWITHSTATE._serialized_end = 11716
+ _COLLECTMETRICS._serialized_start = 11719
+ _COLLECTMETRICS._serialized_end = 11855
+ _PARSE._serialized_start = 11858
+ _PARSE._serialized_end = 12246
_PARSE_OPTIONSENTRY._serialized_start = 3597
_PARSE_OPTIONSENTRY._serialized_end = 3655
- _PARSE_PARSEFORMAT._serialized_start = 12078
- _PARSE_PARSEFORMAT._serialized_end = 12166
+ _PARSE_PARSEFORMAT._serialized_start = 12147
+ _PARSE_PARSEFORMAT._serialized_end = 12235
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index b847378d78b..3dab4f8525e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -1436,6 +1436,7 @@ class Deduplicate(google.protobuf.message.Message):
INPUT_FIELD_NUMBER: builtins.int
COLUMN_NAMES_FIELD_NUMBER: builtins.int
ALL_COLUMNS_AS_KEYS_FIELD_NUMBER: builtins.int
+ WITHIN_WATERMARK_FIELD_NUMBER: builtins.int
@property
def input(self) -> global___Relation:
"""(Required) Input relation for a Deduplicate."""
@@ -1452,22 +1453,29 @@ class Deduplicate(google.protobuf.message.Message):
This field does not co-use with `column_names`.
"""
+ within_watermark: builtins.bool
+ """(Optional) Deduplicate within the time range of watermark."""
def __init__(
self,
*,
input: global___Relation | None = ...,
column_names: collections.abc.Iterable[builtins.str] | None = ...,
all_columns_as_keys: builtins.bool | None = ...,
+ within_watermark: builtins.bool | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_all_columns_as_keys",
b"_all_columns_as_keys",
+ "_within_watermark",
+ b"_within_watermark",
"all_columns_as_keys",
b"all_columns_as_keys",
"input",
b"input",
+ "within_watermark",
+ b"within_watermark",
],
) -> builtins.bool: ...
def ClearField(
@@ -1475,18 +1483,27 @@ class Deduplicate(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_all_columns_as_keys",
b"_all_columns_as_keys",
+ "_within_watermark",
+ b"_within_watermark",
"all_columns_as_keys",
b"all_columns_as_keys",
"column_names",
b"column_names",
"input",
b"input",
+ "within_watermark",
+ b"within_watermark",
],
) -> None: ...
+ @typing.overload
def WhichOneof(
self,
oneof_group: typing_extensions.Literal["_all_columns_as_keys",
b"_all_columns_as_keys"],
) -> typing_extensions.Literal["all_columns_as_keys"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_within_watermark",
b"_within_watermark"]
+ ) -> typing_extensions.Literal["within_watermark"] | None: ...
global___Deduplicate = Deduplicate
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index b316f0f3b4c..d414316c2f9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -1213,6 +1213,14 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
df.dropDuplicates(["name"]).toPandas(),
df2.dropDuplicates(["name"]).toPandas()
)
+ def test_deduplicate_within_watermark_in_batch(self):
+ df = self.connect.read.table(self.tbl_name)
+ with self.assertRaisesRegex(
+ AnalysisException,
+ "dropDuplicatesWithinWatermark is not supported with batch
DataFrames/DataSets",
+ ):
+ df.dropDuplicatesWithinWatermark().toPandas()
+
def test_first(self):
# SPARK-41002: test `first` API in Python Client
df = self.connect.read.table(self.tbl_name)
@@ -1761,7 +1769,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.connect.read.table(self.tbl_name).hint("REPARTITION", "id",
3).toPandas()
def test_join_hint(self):
-
cdf1 = self.connect.createDataFrame([(2, "Alice"), (5, "Bob")],
schema=["age", "name"])
cdf2 = self.connect.createDataFrame(
[Row(height=80, name="Tom"), Row(height=85, name="Bob")]
@@ -2284,7 +2291,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
)
def test_grouped_data(self):
-
query = """
SELECT * FROM VALUES
('James', 'Sales', 3000, 2020),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]