This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8a072ef [SPARK-38684][SS] Fix correctness issue on stream-stream
outer join with RocksDB state store provider
8a072ef is described below
commit 8a072ef6badad69ef5cfdd656d0c068979f6ea76
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Apr 1 18:21:48 2022 +0900
[SPARK-38684][SS] Fix correctness issue on stream-stream outer join with
RocksDB state store provider
### What changes were proposed in this pull request?
(Credit to alex-balikov for the inspiration of the root cause observation,
and anishshri-db for looking into the issue together.)
This PR fixes the correctness issue on stream-stream outer join with
RocksDB state store provider, which can occur in certain condition, like below:
* stream-stream time interval outer join
* left outer join has an issue on left side, right outer join has an
issue on right side, full outer join has an issue on both sides
* At batch N, produce non-late row(s) on the problematic side
* At the same batch (batch N), some row(s) on the problematic side are
evicted by the condition of watermark
The root cause is same as
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read
consistency on iterator, especially with RocksDB state store provider. (Quoting
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting
StateStore changes made after its creation.)
More specifically, if updates are performed during processing input rows
and somehow updates the number of values for grouping key, the update is not
seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method
does the eviction with the number of values in out of sync.
Making it more worse, if the method performs the eviction and updates the
number of values for grouping key, it "overwrites" the number of value,
effectively drop all rows being inserted in the same batch.
Below code blocks are references on understanding the details of the issue.
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223
This PR fixes the outer iterators as late evaluation to ensure all updates
on processing input rows are reflected "before" outer iterators are initialized.
### Why are the changes needed?
The bug is described in above section.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
Closes #36002 from HeartSaVioR/SPARK-38684.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/StreamingSymmetricHashJoinExec.scala | 81 ++++++++++++++++------
.../spark/sql/streaming/StreamingJoinSuite.scala | 63 ++++++++++++++++-
2 files changed, 121 insertions(+), 23 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 81888e0..aa888c1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -324,17 +324,22 @@ case class StreamingSymmetricHashJoinExec(
}
}
+ val initIterFn = { () =>
+ val removedRowIter = leftSideJoiner.removeOldState()
+ removedRowIter.filterNot { kv =>
+ stateFormatVersion match {
+ case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key,
kv.value))
+ case 2 => kv.matched
+ case _ => throwBadStateFormatVersionException()
+ }
+ }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+ }
+
// NOTE: we need to make sure `outerOutputIter` is evaluated "after"
exhausting all of
- // elements in `innerOutputIter`, because evaluation of
`innerOutputIter` may update
- // the match flag which the logic for outer join is relying on.
- val removedRowIter = leftSideJoiner.removeOldState()
- val outerOutputIter = removedRowIter.filterNot { kv =>
- stateFormatVersion match {
- case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key,
kv.value))
- case 2 => kv.matched
- case _ => throwBadStateFormatVersionException()
- }
- }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+ // elements in `hashJoinOutputIter`, otherwise it may lead to out of
sync according to
+ // the interface contract on StateStore.iterator and end up with
correctness issue.
+ // Please refer SPARK-38684 for more details.
+ val outerOutputIter = new
LazilyInitializingJoinedRowIterator(initIterFn)
hashJoinOutputIter ++ outerOutputIter
case RightOuter =>
@@ -344,14 +349,23 @@ case class StreamingSymmetricHashJoinExec(
postJoinFilter(joinedRow.withLeft(leftValue).withRight(rightKeyValue.value))
}
}
- val removedRowIter = rightSideJoiner.removeOldState()
- val outerOutputIter = removedRowIter.filterNot { kv =>
- stateFormatVersion match {
- case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key,
kv.value))
- case 2 => kv.matched
- case _ => throwBadStateFormatVersionException()
- }
- }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+ val initIterFn = { () =>
+ val removedRowIter = rightSideJoiner.removeOldState()
+ removedRowIter.filterNot { kv =>
+ stateFormatVersion match {
+ case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key,
kv.value))
+ case 2 => kv.matched
+ case _ => throwBadStateFormatVersionException()
+ }
+ }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+ }
+
+ // NOTE: we need to make sure `outerOutputIter` is evaluated "after"
exhausting all of
+ // elements in `hashJoinOutputIter`, otherwise it may lead to out of
sync according to
+ // the interface contract on StateStore.iterator and end up with
correctness issue.
+ // Please refer SPARK-38684 for more details.
+ val outerOutputIter = new
LazilyInitializingJoinedRowIterator(initIterFn)
hashJoinOutputIter ++ outerOutputIter
case FullOuter =>
@@ -360,10 +374,25 @@ case class StreamingSymmetricHashJoinExec(
case 2 => kv.matched
case _ => throwBadStateFormatVersionException()
}
- val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(
- isKeyToValuePairMatched).map(pair =>
joinedRow.withLeft(pair.value).withRight(nullRight))
- val rightSideOutputIter = rightSideJoiner.removeOldState().filterNot(
- isKeyToValuePairMatched).map(pair =>
joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+ val leftSideInitIterFn = { () =>
+ val removedRowIter = leftSideJoiner.removeOldState()
+ removedRowIter.filterNot(isKeyToValuePairMatched)
+ .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+ }
+
+ val rightSideInitIterFn = { () =>
+ val removedRowIter = rightSideJoiner.removeOldState()
+ removedRowIter.filterNot(isKeyToValuePairMatched)
+ .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+ }
+
+ // NOTE: we need to make sure both `leftSideOutputIter` and
`rightSideOutputIter` are
+ // evaluated "after" exhausting all of elements in
`hashJoinOutputIter`, otherwise it may
+ // lead to out of sync according to the interface contract on
StateStore.iterator and
+ // end up with correctness issue. Please refer SPARK-38684 for more
details.
+ val leftSideOutputIter = new
LazilyInitializingJoinedRowIterator(leftSideInitIterFn)
+ val rightSideOutputIter = new
LazilyInitializingJoinedRowIterator(rightSideInitIterFn)
hashJoinOutputIter ++ leftSideOutputIter ++ rightSideOutputIter
case _ => throwBadJoinTypeException()
@@ -638,4 +667,12 @@ case class StreamingSymmetricHashJoinExec(
override protected def withNewChildrenInternal(
newLeft: SparkPlan, newRight: SparkPlan): StreamingSymmetricHashJoinExec
=
copy(left = newLeft, right = newRight)
+
+ private class LazilyInitializingJoinedRowIterator(
+ initFn: () => Iterator[JoinedRow]) extends Iterator[JoinedRow] {
+ private lazy val iter: Iterator[JoinedRow] = initFn()
+
+ override def hasNext: Boolean = iter.hasNext
+ override def next(): JoinedRow = iter.next()
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 29caaf7..491b8da 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.streaming.{MemoryStream,
StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec,
StreamingSymmetricHashJoinHelper}
-import org.apache.spark.sql.execution.streaming.state.{StateStore,
StateStoreProviderId}
+import
org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider,
StateStore, StateStoreProviderId}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
@@ -1353,6 +1353,67 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite
{
).select(Symbol("leftKey1"), Symbol("rightKey1"), Symbol("leftKey2"),
Symbol("rightKey2"),
$"leftWindow.end".cast("long"), Symbol("leftValue"),
Symbol("rightValue"))
}
+
+ test("SPARK-38684: outer join works correctly even if processing input rows
and " +
+ "evicting state rows for same grouping key happens in the same
micro-batch") {
+
+ // The test is to demonstrate the correctness issue in outer join before
SPARK-38684.
+ withSQLConf(
+ SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+
+ val input1 = MemoryStream[(Timestamp, String, String)]
+ val df1 = input1.toDF
+ .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+ .withWatermark("eventTime", "0 second")
+
+ val input2 = MemoryStream[(Timestamp, String, String)]
+ val df2 = input2.toDF
+ .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+ .withWatermark("eventTime", "0 second")
+
+ val joined = df1.as("left")
+ .join(df2.as("right"),
+ expr("""
+ |left.id = right.id AND left.eventTime BETWEEN
+ | right.eventTime - INTERVAL 30 seconds AND
+ | right.eventTime + INTERVAL 30 seconds
+ """.stripMargin),
+ joinType = "leftOuter")
+
+ testStream(joined)(
+ MultiAddData(
+ (input1, Seq((Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left
in batch 1"))),
+ (input2, Seq((Timestamp.valueOf("2020-01-02 00:01:00"), "abc",
"right in batch 1")))
+ ),
+ CheckNewAnswer(),
+ MultiAddData(
+ (input1, Seq((Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left
in batch 2"))),
+ (input2, Seq((Timestamp.valueOf("2020-01-02 01:01:00"), "abc",
"right in batch 2")))
+ ),
+ // watermark advanced to "2020-01-02 00:00:00"
+ CheckNewAnswer(),
+ AddData(input1, (Timestamp.valueOf("2020-01-02 01:30:00"), "abc",
"left in batch 3")),
+ // watermark advanced to "2020-01-02 01:00:00"
+ CheckNewAnswer(
+ (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1",
null, null, null)
+ ),
+ // left side state should still contain "left in batch 2" and "left in
batch 3"
+ // we should see both rows in the left side since
+ // - "left in batch 2" is going to be evicted in this batch
+ // - "left in batch 3" is going to be matched with new row in right
side
+ AddData(input2,
+ (Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left
in batch 3")),
+ // watermark advanced to "2020-01-02 01:01:00"
+ CheckNewAnswer(
+ (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2",
+ null, null, null),
+ (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", "left in batch 3",
+ Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left
in batch 3")
+ )
+ )
+ }
+ }
}
class StreamingFullOuterJoinSuite extends StreamingJoinSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]