This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae20bb9d1ba0 [SPARK-55987][SS] Fix time window join in stream-stream 
join state format V4
ae20bb9d1ba0 is described below

commit ae20bb9d1ba08796d526d81f78d885b74124e673
Author: Nicholas Chew <[email protected]>
AuthorDate: Sat Mar 14 06:50:17 2026 +0900

    [SPARK-55987][SS] Fix time window join in stream-stream join state format V4
    
    ### What changes were proposed in this pull request?
    
    Fixes time window joins in stream-stream join state format V4. The V4 state 
manager was deducing the event time column ordinal in join keys independently 
per side which didn't work for window structs. Uses 
`StreamingSymmetricHashJoinHelper.findJoinKeyOrdinalForWatermark` which 
correctly resolves the watermark ordinal for both sides.
    
    Changes:
    - Expose `findJoinKeyOrdinalForWatermark` as `private[join]` in 
`StreamingSymmetricHashJoinHelper`
    - Compute `joinKeyOrdinalForWatermark` in `StreamingSymmetricHashJoinExec` 
and pass it through `OneSideHashJoiner` to the state manager factory
    - Replace `eventTimeColIdxOptInKey` in `SymmetricHashJoinStateManagerV4` 
with the passed-in `joinKeyOrdinalForWatermark`
    - Fix `append()` to try extracting event time from key first, falling back 
to value
    - Remove window join test from V4 skip list since it now passes
    
    ### Why are the changes needed?
    
    Stream-stream join V4 state format did not support window struct join keys 
because the V4 state manager tried to find the event time column in join keys 
by checking metadata independently which doesn't carry through for window 
structs. The existing `findJoinKeyOrdinalForWatermark` utility already handles 
this correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. V4 state format is in development and gated behind 
`spark.sql.streaming.join.stateFormatV4.enabled`.
    
    ### How was this patch tested?
    
    Existing test `"stream stream inner join on windows - with watermark"` now 
runs in V4 suites (previously skipped).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes.
    
    Closes #54786 from nicholaschew11/SPARK-55987-fix-window-join-v4.
    
    Authored-by: Nicholas Chew <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../join/StreamingSymmetricHashJoinExec.scala      | 12 ++++++++----
 .../join/StreamingSymmetricHashJoinHelper.scala    |  2 +-
 .../join/SymmetricHashJoinStateManager.scala       | 22 ++++++++--------------
 .../spark/sql/streaming/StreamingJoinV4Suite.scala |  3 ---
 4 files changed, 17 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
index 95acff230269..7de013fd12eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
@@ -363,17 +363,19 @@ case class StreamingSymmetricHashJoinExec(
     // Create left and right side hash joiners and store in the joiner manager.
     // Both sides should use the same store generator if we are re-using the 
same store instance.
     val joinStateManagerStoreGenerator = new JoinStateManagerStoreGenerator()
+    val joinKeyOrdinalForWatermark =
+      
StreamingSymmetricHashJoinHelper.findJoinKeyOrdinalForWatermark(leftKeys, 
rightKeys)
     val joinerManager = OneSideHashJoinerManager(
       new OneSideHashJoiner(
         LeftSide, left.output, leftKeys, leftInputIter,
         condition.leftSideOnly, postJoinFilter, stateWatermarkPredicates.left, 
partitionId,
         checkpointIds.left.keyToNumValues, 
checkpointIds.left.keyWithIndexToValue,
-        skippedNullValueCount, joinStateManagerStoreGenerator),
+        skippedNullValueCount, joinStateManagerStoreGenerator, 
joinKeyOrdinalForWatermark),
       new OneSideHashJoiner(
         RightSide, right.output, rightKeys, rightInputIter,
         condition.rightSideOnly, postJoinFilter, 
stateWatermarkPredicates.right, partitionId,
         checkpointIds.right.keyToNumValues, 
checkpointIds.right.keyWithIndexToValue,
-        skippedNullValueCount, joinStateManagerStoreGenerator))
+        skippedNullValueCount, joinStateManagerStoreGenerator, 
joinKeyOrdinalForWatermark))
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
@@ -621,7 +623,8 @@ case class StreamingSymmetricHashJoinExec(
       keyToNumValuesStateStoreCkptId: Option[String],
       keyWithIndexToValueStateStoreCkptId: Option[String],
       skippedNullValueCount: Option[SQLMetric],
-      joinStateManagerStoreGenerator: JoinStateManagerStoreGenerator) {
+      joinStateManagerStoreGenerator: JoinStateManagerStoreGenerator,
+      joinKeyOrdinalForWatermark: Option[Int]) {
 
     // Filter the joined rows based on the given condition.
     val preJoinFilter =
@@ -639,7 +642,8 @@ case class StreamingSymmetricHashJoinExec(
       keyWithIndexToValueStateStoreCkptId = 
keyWithIndexToValueStateStoreCkptId,
       stateFormatVersion = stateFormatVersion,
       skippedNullValueCount = skippedNullValueCount,
-      joinStoreGenerator = joinStateManagerStoreGenerator)
+      joinStoreGenerator = joinStateManagerStoreGenerator,
+      joinKeyOrdinalForWatermark = joinKeyOrdinalForWatermark)
 
     private[this] val keyGenerator = UnsafeProjection.create(joinKeys, 
inputAttributes)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
index a916b0d626d3..cea6398f4e50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
@@ -242,7 +242,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     JoinStateWatermarkPredicates(leftStateWatermarkPredicate, 
rightStateWatermarkPredicate)
   }
 
-  private def findJoinKeyOrdinalForWatermark(
+  private[join] def findJoinKeyOrdinalForWatermark(
       leftKeys: Seq[Expression],
       rightKeys: Seq[Expression]): Option[Int] = {
     // Join keys of both sides generate rows of the same fields, that is, same 
sequence of data
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index ae18c49472f7..70599f10842b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -27,8 +27,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX, 
STATE_STORE_ID}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, NamedExpression, 
SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SafeProjection, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.execution.metric.SQLMetric
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
 StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
@@ -189,7 +188,8 @@ class SymmetricHashJoinStateManagerV4(
     skippedNullValueCount: Option[SQLMetric] = None,
     useStateStoreCoordinator: Boolean = true,
     snapshotOptions: Option[SnapshotOptions] = None,
-    joinStoreGenerator: JoinStateManagerStoreGenerator)
+    joinStoreGenerator: JoinStateManagerStoreGenerator,
+    joinKeyOrdinalForWatermark: Option[Int] = None)
   extends SymmetricHashJoinStateManager with SupportsEvictByTimestamp with 
Logging {
 
   // TODO: [SPARK-55729] Once the new state manager is integrated to 
stream-stream join operator,
@@ -230,15 +230,8 @@ class SymmetricHashJoinStateManagerV4(
     }
   }
 
-  private val eventTimeColIdxOptInKey: Option[Int] = {
-    joinKeys.zipWithIndex.collectFirst {
-      case (ne: NamedExpression, index)
-        if ne.metadata.contains(EventTimeWatermark.delayKey) => index
-    }
-  }
-
   private val extractEventTimeFnFromKey: UnsafeRow => Option[Long] = { row =>
-    eventTimeColIdxOptInKey.map { idx =>
+    joinKeyOrdinalForWatermark.map { idx =>
       val attr = keyAttributes(idx)
       if (attr.dataType.isInstanceOf[StructType]) {
         // NOTE: We assume this is window struct, as same as 
WatermarkSupport.watermarkExpression
@@ -314,7 +307,7 @@ class SymmetricHashJoinStateManagerV4(
   private val tsWithKey = new TsWithKeyTypeStore
 
   override def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): 
Unit = {
-    val eventTime = extractEventTimeFn(value)
+    val eventTime = 
extractEventTimeFnFromKey(key).getOrElse(extractEventTimeFn(value))
     // We always do blind merge for appending new value.
     keyWithTsToValues.append(key, eventTime, value, matched)
     tsWithKey.add(eventTime, key)
@@ -1745,7 +1738,8 @@ object SymmetricHashJoinStateManager {
       skippedNullValueCount: Option[SQLMetric] = None,
       useStateStoreCoordinator: Boolean = true,
       snapshotOptions: Option[SnapshotOptions] = None,
-      joinStoreGenerator: JoinStateManagerStoreGenerator): 
SymmetricHashJoinStateManager = {
+      joinStoreGenerator: JoinStateManagerStoreGenerator,
+      joinKeyOrdinalForWatermark: Option[Int] = None): 
SymmetricHashJoinStateManager = {
     if (stateFormatVersion == 4) {
       
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
         "State format version 4 is under development.")
@@ -1753,7 +1747,7 @@ object SymmetricHashJoinStateManager {
         joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, 
hadoopConf,
         partitionId, keyToNumValuesStateStoreCkptId, 
keyWithIndexToValueStateStoreCkptId,
         stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, 
snapshotOptions,
-        joinStoreGenerator
+        joinStoreGenerator, joinKeyOrdinalForWatermark
       )
     } else if (stateFormatVersion == 3) {
       new SymmetricHashJoinStateManagerV2(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
index 48ce34f611c7..d9f405b32e92 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
@@ -55,9 +55,6 @@ trait TestWithV4StateFormat extends 
AlsoTestWithVirtualColumnFamilyJoins {
   // Use lazy val because the parent constructor registers tests before
   // subclass vals are initialized.
   private lazy val testsToSkip = Seq(
-    // V4's timestamp-based indexing does not support window structs
-    // in join keys.
-    "stream stream inner join on windows - with watermark",
     // V4 uses 1 store with VCFs instead of V3's 4*partitions layout,
     // so metric assertions about number of state store instances differ.
     "SPARK-35896: metrics in StateOperatorProgress are output correctly",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to