This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 06674bab1db3 [SPARK-53942][SS] Support changing stateless shuffle
partitions upon restart of streaming query
06674bab1db3 is described below
commit 06674bab1db367fd9fb770fddc95b629f239291a
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sat Oct 25 17:57:36 2025 +0900
[SPARK-53942][SS] Support changing stateless shuffle partitions upon
restart of streaming query
### What changes were proposed in this pull request?
This PR proposes to support changing stateless shuffle partitions upon
restart of streaming query.
We don't introduce a new config or se - users can simply do the below to
change the number of shuffle partitions:
* stop the query
* change the value of `spark.sql.shuffle.partitions`
* restart the query to take effect
Note that state partitions are still fixed and be unchanged from this. That
said, the value of `spark.sql.shuffle.partitions` for batch 0 will be the
number of state partitions and does not change even if the value of the config
has changed upon restart.
As an implementation detail, this PR adds a new "internal" SQL config
`spark.sql.streaming.internal.stateStore.partitions` to distinguish stateless
shuffle partitions vs stateful shuffle partitions. Unlike other internal
configs where we still expect someone (admin?) to use them, this config is NOT
meant to be an user facing one and no one should set this up directly. We add
this config to implement trick for compatibility, nothing else. We don't
support compatibility of this config [...]
That said, the value of the new config is expected to be inherited from
`spark.sql.shuffle.partitions` assuming no one will set this up directly.
To support compatibility, we employ a trick into offset log - for stateful
shuffle partitions, we refer it to
`spark.sql.streaming.internal.stateStore.partitions` in session config, and we
keep using `spark.sql.shuffle.partitions` in offset log. We handle rebinding
between two configs to leave the persistent layer unchanged. This way we can
support the query to be both upgraded/downgraded.
### Why are the changes needed?
Whenever there is need to change the parallelism of the processing e.g.
input volume being changed over time, the size of static table changed over
time, skew in stream-static join (though AQE may help resolving this a bit),
the only official approach to deal with this was to discard checkpoint and
start a new query, implying they have to do full backfill. (For workloads with
FEB sink, advanced (and adventurous) users could change the config in their
user function, but that's arguably [...]
### Does this PR introduce _any_ user-facing change?
Yes, user can change shuffle partitions for stateless operators upon
restart, via changing the config `spark.sql.shuffle.partitions`.
### How was this patch tested?
New UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52645 from
HeartSaVioR/WIP-change-stateless-shuffle-partitions-in-streaming-query.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 20 ++-
.../streaming/checkpointing/OffsetSeq.scala | 90 ++++++++---
.../streaming/runtime/IncrementalExecution.scala | 3 +-
.../streaming/runtime/StreamExecution.scala | 12 +-
.../commits/0 | 2 +
.../metadata | 1 +
.../offsets/0 | 3 +
.../state/0/0/1.delta | Bin 0 -> 46 bytes
.../state/0/0/_metadata/schema | Bin 0 -> 265 bytes
.../state/0/1/1.delta | Bin 0 -> 87 bytes
.../state/0/2/1.delta | Bin 0 -> 75 bytes
.../state/0/3/1.delta | Bin 0 -> 46 bytes
.../state/0/4/1.delta | Bin 0 -> 75 bytes
.../state/0/_metadata/metadata | 2 +
.../spark/sql/streaming/StreamingQuerySuite.scala | 179 +++++++++++++++++++++
15 files changed, 289 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d4ba066b2730..d29100851e57 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -877,9 +877,7 @@ object SQLConf {
.createOptional
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
- .doc("The default number of partitions to use when shuffling data for
joins or aggregations. " +
- "Note: For structured streaming, this configuration cannot be changed
between query " +
- "restarts from the same checkpoint location.")
+ .doc("The default number of partitions to use when shuffling data for
joins or aggregations.")
.version("1.1.0")
.intConf
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be
positive")
@@ -2627,6 +2625,22 @@ object SQLConf {
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)
+ val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
+ buildConf("spark.sql.streaming.internal.stateStore.partitions")
+ .doc("WARN: This config is used internally and is not intended to be
user-facing. This " +
+ "config can be removed without support of compatibility in any time. "
+
+ "DO NOT USE THIS CONFIG DIRECTLY AND USE THE CONFIG
`spark.sql.shuffle.partitions`. " +
+ "The default number of partitions to use when shuffling data for
stateful operations. " +
+ "If not specified, this config picks up the value of
`spark.sql.shuffle.partitions`. " +
+ "Note: For structured streaming, this configuration cannot be changed
between query " +
+ "restarts from the same checkpoint location.")
+ .internal()
+ .intConf
+ .checkValue(_ > 0,
+ "The value of spark.sql.streaming.internal.stateStore.partitions must
be a positive " +
+ "integer.")
+ .createOptional
+
val FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion")
.internal()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index c1c3c379719a..62c903cb689a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.execution.streaming.checkpointing
+import scala.language.existentials
+
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CONFIG, DEFAULT_VALUE, NEW_VALUE,
OLD_VALUE, TIP}
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
SparkDataStream}
@@ -85,6 +88,11 @@ object OffsetSeq {
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
* @param conf: Additional conf_s to be persisted across batches, e.g. number
of shuffle partitions.
+ * CAVEAT: This does not apply the logic we handle in [[OffsetSeqMetadata]]
object, e.g.
+ * deducing the default value of SQL config if the entry does not exist in the
offset log,
+ * resolving the re-bind of config key (the config key in offset log is not
same with the
+ * actual key in session), etc. If you need to get the value with applying the
logic, use
+ * [[OffsetSeqMetadata.readValue()]].
*/
case class OffsetSeqMetadata(
batchWatermarkMs: Long = 0,
@@ -101,13 +109,35 @@ object OffsetSeqMetadata extends Logging {
* log in the checkpoint position.
*/
private val relevantSQLConfs = Seq(
- SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS,
STREAMING_MULTIPLE_WATERMARK_POLICY,
+ STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION,
STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
STATE_STORE_ROCKSDB_FORMAT_VERSION,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN,
STREAMING_STATE_STORE_ENCODING_FORMAT
)
+ /**
+ * This is an extension of `relevantSQLConfs`. The characteristic is the
same, but we persist the
+ * value of config A as config B in offset log. This exists for
compatibility purpose e.g. if
+ * user upgrades Spark and runs a streaming query, but has to downgrade due
to some issues.
+ *
+ * A config should be only bound to either `relevantSQLConfs` or
`rebindSQLConfs` (key or value).
+ */
+ private val rebindSQLConfsSessionToOffsetLog: Map[ConfigEntry[_],
ConfigEntry[_]] = {
+ Map(
+ // TODO: The proper way to handle this is to make the number of
partitions in the state
+ // metadata as the source of truth, but it requires major changes if we
want to take care
+ // of compatibility.
+ STATEFUL_SHUFFLE_PARTITIONS_INTERNAL -> SHUFFLE_PARTITIONS
+ )
+ }
+
+ /**
+ * Reversed index of `rebindSQLConfsSessionToOffsetLog`.
+ */
+ private val rebindSQLConfsOffsetLogToSession: Map[ConfigEntry[_],
ConfigEntry[_]] =
+ rebindSQLConfsSessionToOffsetLog.map { case (k, v) => (v, k) }.toMap
+
/**
* Default values of relevant configurations that are used for backward
compatibility.
* As new configurations are added to the metadata, existing checkpoints may
not have those
@@ -132,6 +162,20 @@ object OffsetSeqMetadata extends Logging {
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow"
)
+ def readValue[T](metadataLog: OffsetSeqMetadata, confKey: ConfigEntry[T]):
String = {
+ readValueOpt(metadataLog, confKey).getOrElse(confKey.defaultValueString)
+ }
+
+ def readValueOpt[T](
+ metadataLog: OffsetSeqMetadata,
+ confKey: ConfigEntry[T]): Option[String] = {
+ val actualKey = if (rebindSQLConfsSessionToOffsetLog.contains(confKey)) {
+ rebindSQLConfsSessionToOffsetLog(confKey)
+ } else confKey
+
+
metadataLog.conf.get(actualKey.key).orElse(relevantSQLConfDefaultValues.get(actualKey.key))
+ }
+
def apply(json: String): OffsetSeqMetadata =
Serialization.read[OffsetSeqMetadata](json)
def apply(
@@ -139,49 +183,59 @@ object OffsetSeqMetadata extends Logging {
batchTimestampMs: Long,
sessionConf: RuntimeConfig): OffsetSeqMetadata = {
val confs = relevantSQLConfs.map { conf => conf.key ->
sessionConf.get(conf.key) }.toMap
- OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs)
+ val confsFromRebind = rebindSQLConfsSessionToOffsetLog.map {
+ case (confInSession, confInOffsetLog) =>
+ confInOffsetLog.key -> sessionConf.get(confInSession.key)
+ }.toMap
+ OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++
confsFromRebind)
}
/** Set the SparkSession configuration with the values in the metadata */
def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: SQLConf): Unit
= {
- val configs = sessionConf.getAllConfs
- OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
-
- metadata.conf.get(confKey) match {
+ def setOneSessionConf(confKeyInOffsetLog: String, confKeyInSession:
String): Unit = {
+ metadata.conf.get(confKeyInOffsetLog) match {
case Some(valueInMetadata) =>
// Config value exists in the metadata, update the session config
with this value
- val optionalValueInSession = sessionConf.getConfString(confKey, null)
+ val optionalValueInSession =
sessionConf.getConfString(confKeyInSession, null)
if (optionalValueInSession != null && optionalValueInSession !=
valueInMetadata) {
- logWarning(log"Updating the value of conf '${MDC(CONFIG,
confKey)}' in current " +
- log"session from '${MDC(OLD_VALUE, optionalValueInSession)}' " +
+ logWarning(log"Updating the value of conf '${MDC(CONFIG,
confKeyInSession)}' in " +
+ log"current session from '${MDC(OLD_VALUE,
optionalValueInSession)}' " +
log"to '${MDC(NEW_VALUE, valueInMetadata)}'.")
}
- sessionConf.setConfString(confKey, valueInMetadata)
+ sessionConf.setConfString(confKeyInSession, valueInMetadata)
case None =>
// For backward compatibility, if a config was not recorded in the
offset log,
// then either inject a default value (if specified in
`relevantSQLConfDefaultValues`) or
// let the existing conf value in SparkSession prevail.
- relevantSQLConfDefaultValues.get(confKey) match {
+ relevantSQLConfDefaultValues.get(confKeyInOffsetLog) match {
case Some(defaultValue) =>
- sessionConf.setConfString(confKey, defaultValue)
- logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in
the offset log, " +
- log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'")
+ sessionConf.setConfString(confKeyInSession, defaultValue)
+ logWarning(log"Conf '${MDC(CONFIG, confKeyInSession)}' was not
found in the offset " +
+ log"log, using default value '${MDC(DEFAULT_VALUE,
defaultValue)}'")
case None =>
- val value = sessionConf.getConfString(confKey, null)
+ val value = sessionConf.getConfString(confKeyInSession, null)
val valueStr = if (value != null) {
s" Using existing session conf value '$value'."
} else {
" No value set in session conf."
}
- logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in
the offset log. " +
- log"${MDC(TIP, valueStr)}")
-
+ logWarning(log"Conf '${MDC(CONFIG, confKeyInSession)}' was not
found in the " +
+ log"offset log. ${MDC(TIP, valueStr)}")
}
}
}
+
+ OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
+ setOneSessionConf(confKey, confKey)
+ }
+
+ rebindSQLConfsOffsetLogToSession.foreach {
+ case (confInOffsetLog, confInSession) =>
+ setOneSessionConf(confInOffsetLog.key, confInSession.key)
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
index 4f41e8a8be06..cf0c297efbf0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
@@ -105,7 +105,8 @@ class IncrementalExecution(
private lazy val hadoopConf = sparkSession.sessionState.newHadoopConf()
- private[sql] val numStateStores =
offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)
+ private[sql] val numStateStores =
OffsetSeqMetadata.readValueOpt(offsetSeqMetadata,
+ SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL)
.map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
.getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
index b5e51bc8b54d..56ed0de1fcdc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
@@ -155,7 +155,7 @@ abstract class StreamExecution(
protected def sources: Seq[SparkDataStream]
/** Isolated spark session to run the batches with. */
- protected val sparkSessionForStream: SparkSession =
sparkSession.cloneSession()
+ protected[sql] val sparkSessionForStream: SparkSession =
sparkSession.cloneSession()
/**
* Manages the metadata from this checkpoint location.
@@ -320,6 +320,16 @@ abstract class StreamExecution(
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
}
+
sparkSessionForStream.conf.get(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL)
match {
+ case Some(_) => // no-op
+ case None =>
+ // Take the default value of `spark.sql.shuffle.partitions`.
+ val shufflePartitionValue =
sparkSessionForStream.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ sparkSessionForStream.conf.set(
+ SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key,
+ shufflePartitionValue)
+ }
+
getLatestExecutionContext().updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be
created
logicalPlan
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
new file mode 100644
index 000000000000..53635b08905f
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
@@ -0,0 +1 @@
+{"id":"295ee44f-dd99-45cf-a21d-9a760b439c45"}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
new file mode 100644
index 000000000000..dcacc16cdc23
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1760948082021,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"mi
[...]
+0
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
new file mode 100644
index 000000000000..864aad4c83e5
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
new file mode 100644
index 000000000000..5acc883909ca
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
new file mode 100644
index 000000000000..00c03b0f2aaa
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
new file mode 100644
index 000000000000..0a0f74c94403
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
new file mode 100644
index 000000000000..39ce28c9b4aa
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
@@ -0,0 +1,2 @@
+v1
+{"operatorInfo":{"operatorId":0,"operatorName":"dedupe"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5}]}
\ No newline at end of file
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b0de21a6d9e8..7adf98b79204 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> OffsetV2, ReadLi
import org.apache.spark.sql.execution.exchange.{REQUIRED_BY_STATEFUL_OPERATOR,
ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
OffsetSeqMetadata}
+import
org.apache.spark.sql.execution.streaming.operators.stateful.StateStoreSaveExec
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset,
MemoryStream, MetricsReporter, StreamExecution, StreamingExecutionRelation,
StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.sources.{MemorySink,
TestForeachWriter}
import
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider,
RocksDBStateStoreProvider, StateStoreCheckpointLocationNotEmpty}
@@ -1479,6 +1480,184 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
)
}
+ test("SPARK-53942: changing the number of stateful shuffle partitions via
config") {
+ val stream = MemoryStream[Int]
+
+ val df = stream.toDF()
+ .groupBy("value")
+ .count()
+
+ withTempDir { checkpointDir =>
+ withSQLConf(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key ->
10.toString) {
+ assert(
+ spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ != spark.conf.get(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL).get
+ )
+
+ testStream(df, OutputMode.Update())(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(stream, 1, 2, 3),
+ ProcessAllAvailable(),
+ AssertOnQuery { q =>
+ // This also proves the path of downgrade; we use the same entry
name to persist the
+ // stateful shuffle partitions, hence it is compatible with older
Spark versions.
+ assert(
+ q.offsetLog.offsetSeqMetadataForBatchId(0).get.conf
+ .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("10"))
+
+ val stateOps = q.lastExecution.executedPlan.collect {
+ case s: StateStoreSaveExec => s
+ }
+
+ val stateStoreSave = stateOps.head
+ assert(stateStoreSave.stateInfo.get.numPartitions === 10)
+ true
+ }
+ )
+ }
+
+ // Trying to change the number of stateful shuffle partitions, which
should be ignored.
+ withSQLConf(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key ->
3.toString) {
+ testStream(df, OutputMode.Update())(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(stream, 4, 5),
+ ProcessAllAvailable(),
+ Execute { q =>
+ assert(
+ q.offsetLog.offsetSeqMetadataForBatchId(1).get.conf
+ .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("10"))
+
+ val stateOps = q.lastExecution.executedPlan.collect {
+ case s: StateStoreSaveExec => s
+ }
+
+ val stateStoreSave = stateOps.head
+ // This shouldn't change to 3.
+ assert(stateStoreSave.stateInfo.get.numPartitions === 10)
+ }
+ )
+ }
+ }
+ }
+
+ test("SPARK-53942: changing the number of stateless shuffle partitions via
config") {
+ val inputData = MemoryStream[(String, Int)]
+ val dfStream = inputData.toDF()
+ .select($"_1".as("key"), $"_2".as("value"))
+
+ val dfBatch = spark.createDataFrame(Seq(("a", "aux1"), ("b", "aux2"),
("c", "aux3")))
+ .toDF("key", "aux")
+
+ val joined = dfStream.join(dfBatch, "key")
+
+ withTempDir { checkpointDir =>
+ withSQLConf(
+ SQLConf.SHUFFLE_PARTITIONS.key -> 1.toString,
+ // We should disable AQE to have deterministic number of shuffle
partitions.
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ // Also disable broadcast hash join.
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ testStream(joined)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, ("a", 1), ("b", 2)),
+ ProcessAllAvailable(),
+ Execute { q =>
+ // The value of stateful shuffle partitions in offset log follows
the
+ // stateless shuffle partitions if it's not specified explicitly.
+ assert(
+ q.sparkSessionForStream.conf.get(
+ SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL) === Some(1))
+ assert(
+ q.offsetLog.offsetSeqMetadataForBatchId(0).get.conf
+ .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("1"))
+
+ val shuffles = q.lastExecution.executedPlan.collect {
+ case s: ShuffleExchangeExec => s
+ }
+
+ val shuffle = shuffles.head
+ assert(shuffle.numPartitions === 1)
+ }
+ )
+ }
+
+ // Trying to change the number of stateless shuffle partitions, which
should be honored.
+ withSQLConf(
+ SQLConf.SHUFFLE_PARTITIONS.key -> 5.toString,
+ // We should disable AQE to have deterministic number of shuffle
partitions.
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ // Also disable broadcast hash join.
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ testStream(joined)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, ("c", 3)),
+ ProcessAllAvailable(),
+ Execute { q =>
+ // Changing the number of stateless shuffle partitions should not
change the number
+ // of stateful shuffle partitions if it's available in offset log.
+ assert(
+ q.sparkSessionForStream.conf.get(
+ SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL) === Some(1))
+ assert(
+ q.offsetLog.offsetSeqMetadataForBatchId(1).get.conf
+ .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("1"))
+
+ val shuffles = q.lastExecution.executedPlan.collect {
+ case s: ShuffleExchangeExec => s
+ }
+
+ val shuffle = shuffles.head
+ assert(shuffle.numPartitions === 5)
+ }
+ )
+ }
+ }
+ }
+
+ test("SPARK-53942: stateful shuffle partitions are retained from old
checkpoint") {
+ val input = MemoryStream[Int]
+ val df1 = input.toDF()
+ .select($"value" as Symbol("key1"), $"value" * 2 as Symbol("key2"),
+ $"value" * 3 as Symbol("value"))
+ val dedup = df1.dropDuplicates("key1", "key2")
+
+ val resourceUri = this.getClass.getResource(
+
"/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/").toURI
+
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but fail
subsequent runs.
+ Utils.copyDirectory(new File(resourceUri), checkpointDir)
+
+ input.addData(1, 1, 2, 3, 4)
+
+ // Trying to change the number of stateless shuffle partitions, which
should be no op
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ testStream(dedup)(
+ // scalastyle:off line.size.limit
+ /*
+ Note: The checkpoint was generated using the following input in
Spark version 4.0.1, with
+ shuffle partitions = 5
+ AddData(inputData, 1, 1, 2, 3, 4),
+ CheckAnswer((1, 2, 3), (2, 4, 6), (3, 6, 9), (4, 8, 12))
+ */
+ // scalastyle:on line.size.limit
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+ AddData(input, 2, 3, 3, 4, 5),
+ CheckAnswer((5, 10, 15)),
+ Execute { q =>
+ val shuffles = q.lastExecution.executedPlan.collect {
+ case s: ShuffleExchangeExec => s
+ }
+
+ val shuffle = shuffles.head
+ assert(shuffle.numPartitions === 5)
+ },
+ StopStream
+ )
+ }
+ }
+
private val TEST_PROVIDERS = Seq(
classOf[HDFSBackedStateStoreProvider].getName,
classOf[RocksDBStateStoreProvider].getName
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]