This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ae20bb9d1ba0 [SPARK-55987][SS] Fix time window join in stream-stream
join state format V4
ae20bb9d1ba0 is described below
commit ae20bb9d1ba08796d526d81f78d885b74124e673
Author: Nicholas Chew <[email protected]>
AuthorDate: Sat Mar 14 06:50:17 2026 +0900
[SPARK-55987][SS] Fix time window join in stream-stream join state format V4
### What changes were proposed in this pull request?
Fixes time window joins in stream-stream join state format V4. The V4 state
manager was deducing the event time column ordinal in join keys independently
per side which didn't work for window structs. Uses
`StreamingSymmetricHashJoinHelper.findJoinKeyOrdinalForWatermark` which
correctly resolves the watermark ordinal for both sides.
Changes:
- Expose `findJoinKeyOrdinalForWatermark` as `private[join]` in
`StreamingSymmetricHashJoinHelper`
- Compute `joinKeyOrdinalForWatermark` in `StreamingSymmetricHashJoinExec`
and pass it through `OneSideHashJoiner` to the state manager factory
- Replace `eventTimeColIdxOptInKey` in `SymmetricHashJoinStateManagerV4`
with the passed-in `joinKeyOrdinalForWatermark`
- Fix `append()` to try extracting event time from key first, falling back
to value
- Remove window join test from V4 skip list since it now passes
### Why are the changes needed?
Stream-stream join V4 state format did not support window struct join keys
because the V4 state manager tried to find the event time column in join keys
by checking metadata independently which doesn't carry through for window
structs. The existing `findJoinKeyOrdinalForWatermark` utility already handles
this correctly.
### Does this PR introduce _any_ user-facing change?
No. V4 state format is in development and gated behind
`spark.sql.streaming.join.stateFormatV4.enabled`.
### How was this patch tested?
Existing test `"stream stream inner join on windows - with watermark"` now
runs in V4 suites (previously skipped).
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Closes #54786 from nicholaschew11/SPARK-55987-fix-window-join-v4.
Authored-by: Nicholas Chew <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../join/StreamingSymmetricHashJoinExec.scala | 12 ++++++++----
.../join/StreamingSymmetricHashJoinHelper.scala | 2 +-
.../join/SymmetricHashJoinStateManager.scala | 22 ++++++++--------------
.../spark/sql/streaming/StreamingJoinV4Suite.scala | 3 ---
4 files changed, 17 insertions(+), 22 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
index 95acff230269..7de013fd12eb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
@@ -363,17 +363,19 @@ case class StreamingSymmetricHashJoinExec(
// Create left and right side hash joiners and store in the joiner manager.
// Both sides should use the same store generator if we are re-using the
same store instance.
val joinStateManagerStoreGenerator = new JoinStateManagerStoreGenerator()
+ val joinKeyOrdinalForWatermark =
+
StreamingSymmetricHashJoinHelper.findJoinKeyOrdinalForWatermark(leftKeys,
rightKeys)
val joinerManager = OneSideHashJoinerManager(
new OneSideHashJoiner(
LeftSide, left.output, leftKeys, leftInputIter,
condition.leftSideOnly, postJoinFilter, stateWatermarkPredicates.left,
partitionId,
checkpointIds.left.keyToNumValues,
checkpointIds.left.keyWithIndexToValue,
- skippedNullValueCount, joinStateManagerStoreGenerator),
+ skippedNullValueCount, joinStateManagerStoreGenerator,
joinKeyOrdinalForWatermark),
new OneSideHashJoiner(
RightSide, right.output, rightKeys, rightInputIter,
condition.rightSideOnly, postJoinFilter,
stateWatermarkPredicates.right, partitionId,
checkpointIds.right.keyToNumValues,
checkpointIds.right.keyWithIndexToValue,
- skippedNullValueCount, joinStateManagerStoreGenerator))
+ skippedNullValueCount, joinStateManagerStoreGenerator,
joinKeyOrdinalForWatermark))
// Join one side input using the other side's buffered/state rows. Here
is how it is done.
//
@@ -621,7 +623,8 @@ case class StreamingSymmetricHashJoinExec(
keyToNumValuesStateStoreCkptId: Option[String],
keyWithIndexToValueStateStoreCkptId: Option[String],
skippedNullValueCount: Option[SQLMetric],
- joinStateManagerStoreGenerator: JoinStateManagerStoreGenerator) {
+ joinStateManagerStoreGenerator: JoinStateManagerStoreGenerator,
+ joinKeyOrdinalForWatermark: Option[Int]) {
// Filter the joined rows based on the given condition.
val preJoinFilter =
@@ -639,7 +642,8 @@ case class StreamingSymmetricHashJoinExec(
keyWithIndexToValueStateStoreCkptId =
keyWithIndexToValueStateStoreCkptId,
stateFormatVersion = stateFormatVersion,
skippedNullValueCount = skippedNullValueCount,
- joinStoreGenerator = joinStateManagerStoreGenerator)
+ joinStoreGenerator = joinStateManagerStoreGenerator,
+ joinKeyOrdinalForWatermark = joinKeyOrdinalForWatermark)
private[this] val keyGenerator = UnsafeProjection.create(joinKeys,
inputAttributes)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
index a916b0d626d3..cea6398f4e50 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinHelper.scala
@@ -242,7 +242,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
JoinStateWatermarkPredicates(leftStateWatermarkPredicate,
rightStateWatermarkPredicate)
}
- private def findJoinKeyOrdinalForWatermark(
+ private[join] def findJoinKeyOrdinalForWatermark(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression]): Option[Int] = {
// Join keys of both sides generate rows of the same fields, that is, same
sequence of data
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index ae18c49472f7..70599f10842b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -27,8 +27,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX,
STATE_STORE_ID}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, NamedExpression,
SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, JoinedRow, Literal, SafeProjection,
SpecificInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.execution.metric.SQLMetric
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
@@ -189,7 +188,8 @@ class SymmetricHashJoinStateManagerV4(
skippedNullValueCount: Option[SQLMetric] = None,
useStateStoreCoordinator: Boolean = true,
snapshotOptions: Option[SnapshotOptions] = None,
- joinStoreGenerator: JoinStateManagerStoreGenerator)
+ joinStoreGenerator: JoinStateManagerStoreGenerator,
+ joinKeyOrdinalForWatermark: Option[Int] = None)
extends SymmetricHashJoinStateManager with SupportsEvictByTimestamp with
Logging {
// TODO: [SPARK-55729] Once the new state manager is integrated to
stream-stream join operator,
@@ -230,15 +230,8 @@ class SymmetricHashJoinStateManagerV4(
}
}
- private val eventTimeColIdxOptInKey: Option[Int] = {
- joinKeys.zipWithIndex.collectFirst {
- case (ne: NamedExpression, index)
- if ne.metadata.contains(EventTimeWatermark.delayKey) => index
- }
- }
-
private val extractEventTimeFnFromKey: UnsafeRow => Option[Long] = { row =>
- eventTimeColIdxOptInKey.map { idx =>
+ joinKeyOrdinalForWatermark.map { idx =>
val attr = keyAttributes(idx)
if (attr.dataType.isInstanceOf[StructType]) {
// NOTE: We assume this is window struct, as same as
WatermarkSupport.watermarkExpression
@@ -314,7 +307,7 @@ class SymmetricHashJoinStateManagerV4(
private val tsWithKey = new TsWithKeyTypeStore
override def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean):
Unit = {
- val eventTime = extractEventTimeFn(value)
+ val eventTime =
extractEventTimeFnFromKey(key).getOrElse(extractEventTimeFn(value))
// We always do blind merge for appending new value.
keyWithTsToValues.append(key, eventTime, value, matched)
tsWithKey.add(eventTime, key)
@@ -1745,7 +1738,8 @@ object SymmetricHashJoinStateManager {
skippedNullValueCount: Option[SQLMetric] = None,
useStateStoreCoordinator: Boolean = true,
snapshotOptions: Option[SnapshotOptions] = None,
- joinStoreGenerator: JoinStateManagerStoreGenerator):
SymmetricHashJoinStateManager = {
+ joinStoreGenerator: JoinStateManagerStoreGenerator,
+ joinKeyOrdinalForWatermark: Option[Int] = None):
SymmetricHashJoinStateManager = {
if (stateFormatVersion == 4) {
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
"State format version 4 is under development.")
@@ -1753,7 +1747,7 @@ object SymmetricHashJoinStateManager {
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf,
hadoopConf,
partitionId, keyToNumValuesStateStoreCkptId,
keyWithIndexToValueStateStoreCkptId,
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator,
snapshotOptions,
- joinStoreGenerator
+ joinStoreGenerator, joinKeyOrdinalForWatermark
)
} else if (stateFormatVersion == 3) {
new SymmetricHashJoinStateManagerV2(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
index 48ce34f611c7..d9f405b32e92 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
@@ -55,9 +55,6 @@ trait TestWithV4StateFormat extends
AlsoTestWithVirtualColumnFamilyJoins {
// Use lazy val because the parent constructor registers tests before
// subclass vals are initialized.
private lazy val testsToSkip = Seq(
- // V4's timestamp-based indexing does not support window structs
- // in join keys.
- "stream stream inner join on windows - with watermark",
// V4 uses 1 store with VCFs instead of V3's 4*partitions layout,
// so metric assertions about number of state store instances differ.
"SPARK-35896: metrics in StateOperatorProgress are output correctly",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]