This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d01f34f670d1 [SPARK-50492][SS] Fix java.util.NoSuchElementException 
when event time column is dropped after dropDuplicatesWithinWatermark
d01f34f670d1 is described below

commit d01f34f670d1e6ffb6fb6580ffa8ea34e20d07cb
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Dec 6 11:28:55 2024 +0900

    [SPARK-50492][SS] Fix java.util.NoSuchElementException when event time 
column is dropped after dropDuplicatesWithinWatermark
    
    ### What changes were proposed in this pull request?
    
    Update `DeduplicateWithinWatermark` references to include all attributes 
that could be the watermarking column.
    
    ### Why are the changes needed?
    
    Fix `java.util.NoSuchElementException` due to ColumnPruning.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49065 from liviazhu-db/liviazhu-db/dedup-watermark-fix.
    
    Authored-by: Livia Zhu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
    (cherry picked from commit 851f5f2ff905636388ff31f349c6fc5064875172)
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala |  3 +++
 .../StreamingDeduplicationWithinWatermarkSuite.scala       | 14 ++++++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index f76e698a6400..b2ae138a9b0a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1944,6 +1944,9 @@ case class Deduplicate(
 }
 
 case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: 
LogicalPlan) extends UnaryNode {
+  // Ensure that references include event time columns so they are not pruned 
away.
+  override def references: AttributeSet = AttributeSet(keys) ++
+    
AttributeSet(child.output.filter(_.metadata.contains(EventTimeWatermark.delayKey)))
   override def maxRows: Option[Long] = child.maxRows
   override def output: Seq[Attribute] = child.output
   final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
index 9a02ab3df7dd..af86e6ec8899 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
@@ -220,4 +220,18 @@ class StreamingDeduplicationWithinWatermarkSuite extends 
StateStoreMetricsTest {
       )
     }
   }
+
+  test("SPARK-50492: drop event time column after 
dropDuplicatesWithinWatermark") {
+    val inputData = MemoryStream[(Int, Int)]
+    val result = inputData.toDS()
+      .withColumn("first", timestamp_seconds($"_1"))
+      .withWatermark("first", "10 seconds")
+      .dropDuplicatesWithinWatermark("_2")
+      .select("_2")
+
+    testStream(result, Append)(
+      AddData(inputData, (1, 2)),
+      CheckAnswer(2)
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to