This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new d01f34f670d1 [SPARK-50492][SS] Fix java.util.NoSuchElementException
when event time column is dropped after dropDuplicatesWithinWatermark
d01f34f670d1 is described below
commit d01f34f670d1e6ffb6fb6580ffa8ea34e20d07cb
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Dec 6 11:28:55 2024 +0900
[SPARK-50492][SS] Fix java.util.NoSuchElementException when event time
column is dropped after dropDuplicatesWithinWatermark
### What changes were proposed in this pull request?
Update `DeduplicateWithinWatermark` references to include all attributes
that could be the watermarking column.
### Why are the changes needed?
Fix `java.util.NoSuchElementException` due to ColumnPruning.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49065 from liviazhu-db/liviazhu-db/dedup-watermark-fix.
Authored-by: Livia Zhu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 851f5f2ff905636388ff31f349c6fc5064875172)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/catalyst/plans/logical/basicLogicalOperators.scala | 3 +++
.../StreamingDeduplicationWithinWatermarkSuite.scala | 14 ++++++++++++++
2 files changed, 17 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index f76e698a6400..b2ae138a9b0a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1944,6 +1944,9 @@ case class Deduplicate(
}
case class DeduplicateWithinWatermark(keys: Seq[Attribute], child:
LogicalPlan) extends UnaryNode {
+ // Ensure that references include event time columns so they are not pruned
away.
+ override def references: AttributeSet = AttributeSet(keys) ++
+
AttributeSet(child.output.filter(_.metadata.contains(EventTimeWatermark.delayKey)))
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
index 9a02ab3df7dd..af86e6ec8899 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
@@ -220,4 +220,18 @@ class StreamingDeduplicationWithinWatermarkSuite extends
StateStoreMetricsTest {
)
}
}
+
+ test("SPARK-50492: drop event time column after
dropDuplicatesWithinWatermark") {
+ val inputData = MemoryStream[(Int, Int)]
+ val result = inputData.toDS()
+ .withColumn("first", timestamp_seconds($"_1"))
+ .withWatermark("first", "10 seconds")
+ .dropDuplicatesWithinWatermark("_2")
+ .select("_2")
+
+ testStream(result, Append)(
+ AddData(inputData, (1, 2)),
+ CheckAnswer(2)
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]