This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 3e5407dbbfb [SPARK-38809][SS] Implement option to skip null values in
symmetric hash implementation of stream-stream joins
3e5407dbbfb is described below
commit 3e5407dbbfb8ea955e9c44df1893ad24a9449a28
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Fri Apr 8 12:19:50 2022 +0900
[SPARK-38809][SS] Implement option to skip null values in symmetric hash
implementation of stream-stream joins
### What changes were proposed in this pull request?
In the symmetric has join state manager, we can receive entries with null
values for a key and that can cause the `removeByValue` and get iterators to
fail and run into the NullPointerException. This is possible if the state
recovered is written from an old spark version or its corrupted on disk or due
to issues with the iterators. Since we don't have a utility to query this
state, we would like to provide a conf option to skip nulls for the symmetric
hash implementation in stream str [...]
### Why are the changes needed?
Without these changes, if we encounter null values for stream-stream joins,
the executor task will repeatedly fail with NullPointerException and will
terminate the stage and eventually the query as well. This change allows the
user to set a config option to continue iterating by skipping null values for
symmetric hash based implementation of stream-stream joins.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests to test the new functionality by adding nulls in between
and forcing the iteration/get calls with nulls in the mix and tested the
behavior with the config disabled as well as enabled.
Sample output:
```
[info] SymmetricHashJoinStateManagerSuite:
15:07:50.627 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
[info] - StreamingJoinStateManager V1 - all operations (588 milliseconds)
[info] - StreamingJoinStateManager V2 - all operations (251 milliseconds)
15:07:52.669 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=4.
15:07:52.671 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=3.
15:07:52.672 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=1 and endIndex=3.
15:07:52.672 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=1 and endIndex=1.
[info] - StreamingJoinStateManager V1 - all operations with nulls (252
milliseconds)
15:07:52.896 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=4.
15:07:52.897 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=3.
15:07:52.898 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=1 and endIndex=3.
15:07:52.898 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=1 and endIndex=1.
[info] - StreamingJoinStateManager V2 - all operations with nulls (221
milliseconds)
15:07:53.114 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=5 and endIndex=6.
15:07:53.116 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=6.
15:07:53.331 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=5 and endIndex=6.
15:07:53.331 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=3.
[info] - StreamingJoinStateManager V1 - all operations with nulls in middle
(435 milliseconds)
15:07:53.549 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=5 and endIndex=6.
15:07:53.551 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=6.
15:07:53.785 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=5 and endIndex=6.
15:07:53.785 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager:
`keyWithIndexToValue` returns a null value for indices with range from
startIndex=3 and endIndex=3.
[info] - StreamingJoinStateManager V2 - all operations with nulls in middle
(456 milliseconds)
[info] - SPARK-35689: StreamingJoinStateManager V1 - printable key of
keyWithIndexToValue (390 milliseconds)
[info] - SPARK-35689: StreamingJoinStateManager V2 - printable key of
keyWithIndexToValue (216 milliseconds)
15:07:54.640 WARN
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite:
===== POSSIBLE THREAD LEAK IN SUITE
o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite,
threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
[info] Run completed in 5 seconds, 714 milliseconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
Closes #36090 from anishshri-db/bfix/SPARK-38809.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 61c489ea7ef51d7d0217f770ec358ed7a7b76b42)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++
.../execution/streaming/state/StateStoreConf.scala | 3 +
.../state/SymmetricHashJoinStateManager.scala | 38 ++++++---
.../state/SymmetricHashJoinStateManagerSuite.scala | 91 ++++++++++++++++++----
4 files changed, 123 insertions(+), 25 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fbd56968c1d..9b7d4aee745 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1897,6 +1897,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ /**
+ * SPARK-38809 - Config option to allow skipping null values for hash based
stream-stream joins.
+ * Its possible for us to see nulls if state was written with an older
version of Spark,
+ * the state was corrupted on disk or if we had an issue with the state
iterators.
+ */
+ val STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS =
+
buildConf("spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled")
+ .internal()
+ .doc("When true, this config will skip null values in hash based
stream-stream joins.")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(false)
+
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`,
`${system:var}`, " +
@@ -3866,6 +3879,9 @@ class SQLConf extends Serializable with Logging {
def stateStoreFormatValidationEnabled: Boolean =
getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
+ def stateStoreSkipNullsForStreamStreamJoins: Boolean =
+ getConf(STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS)
+
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
def isUnsupportedOperationCheckEnabled: Boolean =
getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 58af8272d1c..529db2609cd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -52,6 +52,9 @@ class StateStoreConf(
val formatValidationCheckValue: Boolean =
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG,
"true") == "true"
+ /** Whether to skip null values for hash based stream-stream joins. */
+ val skipNullsForStreamStreamJoins: Boolean =
sqlConf.stateStoreSkipNullsForStreamStreamJoins
+
/** The compression codec used to compress delta and snapshot files. */
val compressionCodec: String = sqlConf.stateStoreCompressionCodec
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 56c47d564a3..d17c6e8e862 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -222,8 +222,12 @@ class SymmetricHashJoinStateManager(
valueRemoved = false
}
- // Find the next value satisfying the condition, updating `currentKey`
and `numValues` if
- // needed. Returns null when no value can be found.
+ /**
+ * Find the next value satisfying the condition, updating `currentKey`
and `numValues` if
+ * needed. Returns null when no value can be found.
+ * Note that we will skip nulls explicitly if config setting for the
same is
+ * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
+ */
private def findNextValueForIndex(): ValueAndMatchPair = {
// Loop across all values for the current key, and then all other
keys, until we find a
// value satisfying the removal condition.
@@ -233,7 +237,9 @@ class SymmetricHashJoinStateManager(
if (hasMoreValuesForCurrentKey) {
// First search the values for the current key.
val valuePair = keyWithIndexToValue.get(currentKey, index)
- if (removalCondition(valuePair.value)) {
+ if (valuePair == null && storeConf.skipNullsForStreamStreamJoins) {
+ index += 1
+ } else if (removalCondition(valuePair.value)) {
return valuePair
} else {
index += 1
@@ -597,22 +603,30 @@ class SymmetricHashJoinStateManager(
/**
* Get all values and indices for the provided key.
* Should not return null.
+ * Note that we will skip nulls explicitly if config setting for the same
is
+ * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
*/
def getAll(key: UnsafeRow, numValues: Long):
Iterator[KeyWithIndexAndValue] = {
- val keyWithIndexAndValue = new KeyWithIndexAndValue()
- var index = 0
new NextIterator[KeyWithIndexAndValue] {
+ private val keyWithIndexAndValue = new KeyWithIndexAndValue()
+ private var index: Long = 0L
+
+ private def hasMoreValues = index < numValues
override protected def getNext(): KeyWithIndexAndValue = {
- if (index >= numValues) {
- finished = true
- null
- } else {
+ while (hasMoreValues) {
val keyWithIndex = keyWithIndexRow(key, index)
val valuePair =
valueRowConverter.convertValue(stateStore.get(keyWithIndex))
- keyWithIndexAndValue.withNew(key, index, valuePair)
- index += 1
- keyWithIndexAndValue
+ if (valuePair == null && storeConf.skipNullsForStreamStreamJoins) {
+ index += 1
+ } else {
+ keyWithIndexAndValue.withNew(key, index, valuePair)
+ index += 1
+ return keyWithIndexAndValue
+ }
}
+
+ finished = true
+ return null
}
override protected def close(): Unit = {}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index deeebe1fc42..30d39ebcc4a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -29,6 +29,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,12 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest
with BeforeAndAfter
}
}
+ SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+ test(s"StreamingJoinStateManager V${version} - all operations with nulls
in middle") {
+ testAllOperationsWithNullsInMiddle(version)
+ }
+ }
+
SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
"printable key of keyWithIndexToValue") {
@@ -167,6 +174,55 @@ class SymmetricHashJoinStateManagerSuite extends
StreamTest with BeforeAndAfter
}
}
+ /* Test removeByValue with nulls in middle simulated by updating numValues
on the state manager */
+ private def testAllOperationsWithNullsInMiddle(stateFormatVersion: Int):
Unit = {
+ // Test with skipNullsForStreamStreamJoins set to false which would throw a
+ // NullPointerException while iterating and also return null values as
part of get
+ withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion)
{ manager =>
+ implicit val mgr = manager
+
+ val ex = intercept[Exception] {
+ appendAndTest(40, 50, 200, 300)
+ assert(numRows === 3)
+ updateNumValues(40, 4) // create a null at the end
+ append(40, 400)
+ updateNumValues(40, 7) // create nulls in between and end
+ removeByValue(50)
+ }
+ assert(ex.isInstanceOf[NullPointerException])
+ assert(getNumValues(40) === 7) // we should get 7 with no nulls
skipped
+
+ removeByValue(300)
+ assert(getNumValues(40) === 1) // only 400 should remain
+ assert(get(40) === Seq(400))
+ removeByValue(400)
+ assert(get(40) === Seq.empty)
+ assert(numRows === 0) // ensure all elements
removed
+ }
+
+ // Test with skipNullsForStreamStreamJoins set to true which would skip
nulls
+ // and continue iterating as part of removeByValue as well as get
+ withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion,
true) { manager =>
+ implicit val mgr = manager
+
+ appendAndTest(40, 50, 200, 300)
+ assert(numRows === 3)
+ updateNumValues(40, 4) // create a null at the end
+ append(40, 400)
+ updateNumValues(40, 7) // create nulls in between and end
+
+ removeByValue(50)
+ assert(getNumValues(40) === 3) // we should now get (400, 200,
300) with nulls skipped
+
+ removeByValue(300)
+ assert(getNumValues(40) === 1) // only 400 should remain
+ assert(get(40) === Seq(400))
+ removeByValue(400)
+ assert(get(40) === Seq.empty)
+ assert(numRows === 0) // ensure all elements
removed
+ }
+ }
+
val watermarkMetadata = new
MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build()
val inputValueSchema = new StructType()
.add(StructField("time", IntegerType, metadata = watermarkMetadata))
@@ -205,6 +261,11 @@ class SymmetricHashJoinStateManagerSuite extends
StreamTest with BeforeAndAfter
manager.updateNumValuesTestOnly(toJoinKeyRow(key), numValues)
}
+ def getNumValues(key: Int)
+ (implicit manager: SymmetricHashJoinStateManager): Int = {
+ manager.get(toJoinKeyRow(key)).size
+ }
+
def get(key: Int)(implicit manager: SymmetricHashJoinStateManager): Seq[Int]
= {
manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted
}
@@ -232,22 +293,26 @@ class SymmetricHashJoinStateManagerSuite extends
StreamTest with BeforeAndAfter
manager.metrics.numKeys
}
-
def withJoinStateManager(
- inputValueAttribs: Seq[Attribute],
- joinKeyExprs: Seq[Expression],
- stateFormatVersion: Int)(f: SymmetricHashJoinStateManager => Unit): Unit =
{
+ inputValueAttribs: Seq[Attribute],
+ joinKeyExprs: Seq[Expression],
+ stateFormatVersion: Int,
+ skipNullsForStreamStreamJoins: Boolean = false)
+ (f: SymmetricHashJoinStateManager => Unit): Unit = {
withTempDir { file =>
- val storeConf = new StateStoreConf()
- val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath,
UUID.randomUUID, 0, 0, 5)
- val manager = new SymmetricHashJoinStateManager(
- LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), storeConf,
new Configuration,
- partitionId = 0, stateFormatVersion)
- try {
- f(manager)
- } finally {
- manager.abortIfNeeded()
+ withSQLConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.key ->
+ skipNullsForStreamStreamJoins.toString) {
+ val storeConf = new StateStoreConf(spark.sqlContext.conf)
+ val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath,
UUID.randomUUID, 0, 0, 5)
+ val manager = new SymmetricHashJoinStateManager(
+ LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo),
storeConf, new Configuration,
+ partitionId = 0, stateFormatVersion)
+ try {
+ f(manager)
+ } finally {
+ manager.abortIfNeeded()
+ }
}
}
StateStore.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]