This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a50b30d7a02b [SPARK-48687][SS] Add change to perform state schema
validation and update on driver in planning phase for stateful queries
a50b30d7a02b is described below
commit a50b30d7a02bf45f1ddb8db0be6779b1441e1d4d
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Jun 26 22:09:42 2024 +0900
[SPARK-48687][SS] Add change to perform state schema validation and update
on driver in planning phase for stateful queries
### What changes were proposed in this pull request?
Add change to perform schema validation and update on driver for stateful
queries
### Why are the changes needed?
Avoid making specific partition checks on executor. Also allows for
versioning easily in the future
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests and ran existing tests
```
[info] Run completed in 10 seconds, 500 milliseconds.
[info] Total number of tests run: 29
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 29, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 35 s, completed Jun 21, 2024, 3:58:12 PM
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47035 from anishshri-db/task/schema-changes-on-driver.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/FlatMapGroupsWithStateExec.scala | 7 +
.../execution/streaming/IncrementalExecution.scala | 17 ++-
.../streaming/StreamingSymmetricHashJoinExec.scala | 20 +++
.../streaming/TransformWithStateExec.scala | 9 ++
.../state/StateSchemaCompatibilityChecker.scala | 141 +++++++++++++++++----
.../sql/execution/streaming/state/StateStore.scala | 49 +------
.../state/SymmetricHashJoinStateManager.scala | 29 +++++
.../execution/streaming/statefulOperators.scala | 36 ++++++
.../sql/execution/streaming/streamingLimits.scala | 9 +-
.../.metadata.crc | Bin 0 -> 12 bytes
.../commits/.0.crc | Bin 0 -> 12 bytes
.../commits/.1.crc | Bin 0 -> 12 bytes
.../commits/0 | 2 +
.../commits/1 | 2 +
.../metadata | 1 +
.../offsets/.0.crc | Bin 0 -> 16 bytes
.../offsets/.1.crc | Bin 0 -> 16 bytes
.../offsets/0 | 3 +
.../offsets/1 | 3 +
.../state/0/0/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/.2.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/1.delta | Bin 0 -> 77 bytes
.../state/0/0/2.delta | Bin 0 -> 46 bytes
.../state/0/0/_metadata/.schema.crc | Bin 0 -> 12 bytes
.../state/0/0/_metadata/schema | Bin 0 -> 197 bytes
.../state/0/1/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/.2.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/1.delta | Bin 0 -> 46 bytes
.../state/0/1/2.delta | Bin 0 -> 77 bytes
.../state/0/2/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/.2.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/1.delta | Bin 0 -> 46 bytes
.../state/0/2/2.delta | Bin 0 -> 46 bytes
.../state/0/3/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/.2.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/1.delta | Bin 0 -> 46 bytes
.../state/0/3/2.delta | Bin 0 -> 46 bytes
.../state/0/4/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/.2.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/1.delta | Bin 0 -> 46 bytes
.../state/0/4/2.delta | Bin 0 -> 46 bytes
.../.metadata.crc | Bin 0 -> 12 bytes
.../commits/.0.crc | Bin 0 -> 12 bytes
.../commits/0 | 2 +
.../metadata | 1 +
.../offsets/.0.crc | Bin 0 -> 16 bytes
.../offsets/0 | 4 +
.../state/0/0/left-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/left-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/0/left-keyToNumValues/_metadata/.schema.crc | Bin 0 -> 12 bytes
.../state/0/0/left-keyToNumValues/_metadata/schema | Bin 0 -> 199 bytes
.../0/0/left-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/left-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../left-keyWithIndexToValue/_metadata/.schema.crc | Bin 0 -> 12 bytes
.../0/0/left-keyWithIndexToValue/_metadata/schema | Bin 0 -> 430 bytes
.../state/0/0/right-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/right-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/0/right-keyToNumValues/_metadata/.schema.crc | Bin 0 -> 12 bytes
.../0/0/right-keyToNumValues/_metadata/schema | Bin 0 -> 199 bytes
.../0/0/right-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/0/right-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../_metadata/.schema.crc | Bin 0 -> 12 bytes
.../0/0/right-keyWithIndexToValue/_metadata/schema | Bin 0 -> 432 bytes
.../state/0/1/left-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/left-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/1/left-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/left-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../state/0/1/right-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/right-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/1/right-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/1/right-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../state/0/2/left-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/left-keyToNumValues/1.delta | Bin 0 -> 70 bytes
.../0/2/left-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/left-keyWithIndexToValue/1.delta | Bin 0 -> 89 bytes
.../state/0/2/right-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/right-keyToNumValues/1.delta | Bin 0 -> 70 bytes
.../0/2/right-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/2/right-keyWithIndexToValue/1.delta | Bin 0 -> 82 bytes
.../state/0/3/left-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/left-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/3/left-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/left-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../state/0/3/right-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/right-keyToNumValues/1.delta | Bin 0 -> 46 bytes
.../0/3/right-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/3/right-keyWithIndexToValue/1.delta | Bin 0 -> 46 bytes
.../state/0/4/left-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/left-keyToNumValues/1.delta | Bin 0 -> 70 bytes
.../0/4/left-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/left-keyWithIndexToValue/1.delta | Bin 0 -> 90 bytes
.../state/0/4/right-keyToNumValues/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/right-keyToNumValues/1.delta | Bin 0 -> 70 bytes
.../0/4/right-keyWithIndexToValue/.1.delta.crc | Bin 0 -> 12 bytes
.../state/0/4/right-keyWithIndexToValue/1.delta | Bin 0 -> 83 bytes
.../StateSchemaCompatibilityCheckerSuite.scala | 84 ++++++------
.../streaming/StreamingDeduplicationSuite.scala | 90 +++++++++----
.../spark/sql/streaming/StreamingJoinSuite.scala | 141 +++++++++++++++++++++
98 files changed, 514 insertions(+), 136 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 01b16b63fa27..2fa744a0f89b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.TimeUnit.NANOSECONDS
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -187,6 +189,11 @@ trait FlatMapGroupsWithStateExecBase
})
}
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ groupingAttributes.toStructType, stateManager.stateSchema,
session.sqlContext.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
stateManager // force lazy init at driver
metrics // force lazy init at driver
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index f72e2eb407f8..f7f3534ea408 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -198,6 +198,16 @@ class IncrementalExecution(
}
}
+ // Planning rule used to record the state schema for the first run and
validate state schema
+ // changes across query runs.
+ object StateSchemaValidationRule extends SparkPlanPartialRule {
+ override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+ case statefulOp: StatefulOperator if isFirstBatch =>
+ statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf)
+ statefulOp
+ }
+ }
+
object StateOpIdRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case StateStoreSaveExec(keys, None, None, None, None, stateFormatVersion,
@@ -471,9 +481,12 @@ class IncrementalExecution(
if (isFirstBatch && currentBatchId != 0) {
checkOperatorValidWithMetadata(planWithStateOpId)
}
- // The rule doesn't change the plan but cause the side effect that
metadata is written
- // in the checkpoint directory of stateful operator.
+
+ // The two rules below don't change the plan but can cause the side
effect that
+ // metadata/schema is written in the checkpoint directory of stateful
operator.
+ planWithStateOpId transform StateSchemaValidationRule.rule
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
+
simulateWatermarkPropagation(planWithStateOpId)
planWithStateOpId transform WatermarkPropagationRule.rule
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 20a05a100033..28365eac6e81 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.TimeUnit.NANOSECONDS
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow}
@@ -31,6 +33,7 @@ import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper
import org.apache.spark.sql.execution.streaming.state._
import
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}
@@ -243,6 +246,23 @@ case class StreamingSymmetricHashJoinExec(
watermarkUsedForStateCleanup && watermarkHasChanged
}
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+ var result: Map[String, (StructType, StructType)] = Map.empty
+ // get state schema for state stores on left side of the join
+ result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(LeftSide,
+ left.output, leftKeys, stateFormatVersion)
+
+ // get state schema for state stores on right side of the join
+ result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(RightSide,
+ right.output, rightKeys, stateFormatVersion)
+
+ // validate and maybe evolve schema for all state stores across both sides
of the join
+ result.foreach { case (stateStoreName, (keySchema, valueSchema)) =>
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keySchema, valueSchema, session.sessionState, storeName =
stateStoreName)
+ }
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
val stateStoreCoord =
session.sessionState.streamingQueryManager.stateStoreCoordinator
val stateStoreNames =
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index fbd062acfb5d..c5b22c89e82f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import java.util.concurrent.TimeUnit.NANOSECONDS
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -338,6 +340,13 @@ case class TransformWithStateExec(
)
}
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+ // TODO: transformWithState is special because we don't have the schema of
the state directly
+ // within the passed args. We need to gather this after running the init
function
+ // within the stateful processor on the driver. This also requires a
schema format change
+ // when recording this information persistently.
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 374ab52b9110..e365cc6371f2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -17,12 +17,17 @@
package org.apache.spark.sql.execution.streaming.state
+import scala.util.Try
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.internal.{Logging, LogKeys, MDC}
-import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
+import org.apache.spark.sql.execution.streaming.{CheckpointFileManager,
StatefulOperatorStateInfo}
import
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader,
SchemaWriter}
+import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{DataType, StructType}
class StateSchemaCompatibilityChecker(
@@ -37,30 +42,30 @@ class StateSchemaCompatibilityChecker(
fm.mkdirs(schemaFileLocation.getParent)
- def check(keySchema: StructType, valueSchema: StructType): Unit = {
- check(keySchema, valueSchema, ignoreValueSchema = false)
- }
+ /**
+ * Function to check if new state store schema is compatible with the
existing schema.
+ * @param oldSchema - old state schema
+ * @param newSchema - new state schema
+ * @param ignoreValueSchema - whether to ignore value schema or not
+ */
+ private def check(
+ oldSchema: (StructType, StructType),
+ newSchema: (StructType, StructType),
+ ignoreValueSchema: Boolean) : Unit = {
+ val (storedKeySchema, storedValueSchema) = oldSchema
+ val (keySchema, valueSchema) = newSchema
- def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema:
Boolean): Unit = {
- if (fm.exists(schemaFileLocation)) {
- logDebug(s"Schema file for provider $providerId exists. Comparing with
provided schema.")
- val (storedKeySchema, storedValueSchema) = readSchemaFile()
- if (storedKeySchema.equals(keySchema) &&
- (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
- // schema is exactly same
- } else if (!schemasCompatible(storedKeySchema, keySchema)) {
- throw
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
- keySchema.toString)
- } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema,
valueSchema)) {
- throw
StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
- valueSchema.toString)
- } else {
- logInfo("Detected schema change which is compatible. Allowing to put
rows.")
- }
+ if (storedKeySchema.equals(keySchema) &&
+ (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
+ // schema is exactly same
+ } else if (!schemasCompatible(storedKeySchema, keySchema)) {
+ throw
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
+ keySchema.toString)
+ } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema,
valueSchema)) {
+ throw
StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
+ valueSchema.toString)
} else {
- // schema doesn't exist, create one now
- logDebug(s"Schema file for provider $providerId doesn't exist. Creating
one.")
- createSchemaFile(keySchema, valueSchema)
+ logInfo("Detected schema change which is compatible. Allowing to put
rows.")
}
}
@@ -82,7 +87,20 @@ class StateSchemaCompatibilityChecker(
}
}
- def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit =
{
+ /**
+ * Function to read and return the existing key and value schema from the
schema file, if it
+ * exists
+ * @return - Option of (keySchema, valueSchema) if the schema file exists,
None otherwise
+ */
+ private def getExistingKeyAndValueSchema(): Option[(StructType, StructType)]
= {
+ if (fm.exists(schemaFileLocation)) {
+ Some(readSchemaFile())
+ } else {
+ None
+ }
+ }
+
+ private def createSchemaFile(keySchema: StructType, valueSchema:
StructType): Unit = {
createSchemaFile(keySchema, valueSchema, schemaWriter)
}
@@ -103,10 +121,85 @@ class StateSchemaCompatibilityChecker(
}
}
+ def validateAndMaybeEvolveStateSchema(
+ newKeySchema: StructType,
+ newValueSchema: StructType,
+ ignoreValueSchema: Boolean): Unit = {
+ val existingSchema = getExistingKeyAndValueSchema()
+ if (existingSchema.isEmpty) {
+ // write the schema file if it doesn't exist
+ createSchemaFile(newKeySchema, newValueSchema)
+ } else {
+ // validate if the new schema is compatible with the existing schema
+ check(existingSchema.get, (newKeySchema, newValueSchema),
ignoreValueSchema)
+ }
+ }
+
private def schemaFile(storeCpLocation: Path): Path =
new Path(new Path(storeCpLocation, "_metadata"), "schema")
}
object StateSchemaCompatibilityChecker {
val VERSION = 2
+
+ private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
+ if (!UnsafeRowUtils.isBinaryStable(schema)) {
+ throw new SparkUnsupportedOperationException(
+ errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
+ messageParameters = Map("schema" -> schema.json)
+ )
+ }
+ }
+
+ /**
+ * Function to validate the schema of the state store and maybe evolve it if
needed.
+ * We also verify for binary inequality columns in the schema and disallow
them. We then perform
+ * key and value schema validation. Depending on the passed configs, a
warning might be logged
+ * or an exception might be thrown if the schema is not compatible.
+ *
+ * @param stateInfo - StatefulOperatorStateInfo containing the state store
information
+ * @param hadoopConf - Hadoop configuration
+ * @param newKeySchema - New key schema
+ * @param newValueSchema - New value schema
+ * @param sessionState - session state used to retrieve session config
+ * @param extraOptions - any extra options to be passed for StateStoreConf
creation
+ * @param storeName - optional state store name
+ */
+ def validateAndMaybeEvolveStateSchema(
+ stateInfo: StatefulOperatorStateInfo,
+ hadoopConf: Configuration,
+ newKeySchema: StructType,
+ newValueSchema: StructType,
+ sessionState: SessionState,
+ extraOptions: Map[String, String] = Map.empty,
+ storeName: String = StateStoreId.DEFAULT_STORE_NAME): Unit = {
+ // SPARK-47776: collation introduces the concept of binary (in)equality,
which means
+ // in some collation we no longer be able to just compare the binary
format of two
+ // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be
"semantically"
+ // same in case insensitive collation.
+ // State store is basically key-value storage, and the most provider
implementations
+ // rely on the fact that all the columns in the key schema support binary
equality.
+ // We need to disallow using binary inequality column in the key schema,
before we
+ // could support this in majority of state store providers (or high-level
of state
+ // store.)
+ disallowBinaryInequalityColumn(newKeySchema)
+
+ val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+ val providerId =
StateStoreProviderId(StateStoreId(stateInfo.checkpointLocation,
+ stateInfo.operatorId, 0, storeName), stateInfo.queryRunId)
+ val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
+ // regardless of configuration, we check compatibility to at least write
schema file
+ // if necessary
+ // if the format validation for value schema is disabled, we also disable
the schema
+ // compatibility checker for value schema as well.
+ val result = Try(
+ checker.validateAndMaybeEvolveStateSchema(newKeySchema, newValueSchema,
+ ignoreValueSchema = !storeConf.formatValidationCheckValue)
+ ).toEither.fold(Some(_), _ => None)
+
+ // if schema validation is enabled and an exception is thrown, we re-throw
it and fail the query
+ if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
+ throw result.get
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 2f9ce2c236f4..b94f5d6cecd1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -23,13 +23,12 @@ import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
-import scala.util.Try
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkContext, SparkEnv, SparkException,
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
@@ -524,9 +523,6 @@ object StateStore extends Logging {
@GuardedBy("loadedProviders")
private val loadedProviders = new mutable.HashMap[StateStoreProviderId,
StateStoreProvider]()
- @GuardedBy("loadedProviders")
- private val schemaValidated = new mutable.HashMap[StateStoreProviderId,
Option[Throwable]]()
-
private val maintenanceThreadPoolLock = new Object
// Shared exception between threads in thread pool that the scheduling thread
@@ -649,15 +645,6 @@ object StateStore extends Logging {
storeProvider.getStore(version)
}
- private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
- if (!UnsafeRowUtils.isBinaryStable(schema)) {
- throw new SparkUnsupportedOperationException(
- errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
- messageParameters = Map("schema" -> schema.json)
- )
- }
- }
-
private def getStateStoreProvider(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
@@ -670,40 +657,6 @@ object StateStore extends Logging {
loadedProviders.synchronized {
startMaintenanceIfNeeded(storeConf)
- if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA)
{
- val result = schemaValidated.getOrElseUpdate(storeProviderId, {
- // SPARK-47776: collation introduces the concept of binary
(in)equality, which means
- // in some collation we no longer be able to just compare the binary
format of two
- // UnsafeRows to determine equality. For example, 'aaa' and 'AAA'
can be "semantically"
- // same in case insensitive collation.
- // State store is basically key-value storage, and the most provider
implementations
- // rely on the fact that all the columns in the key schema support
binary equality.
- // We need to disallow using binary inequality column in the key
schema, before we
- // could support this in majority of state store providers (or
high-level of state
- // store.)
- disallowBinaryInequalityColumn(keySchema)
-
- val checker = new StateSchemaCompatibilityChecker(storeProviderId,
hadoopConf)
- // regardless of configuration, we check compatibility to at least
write schema file
- // if necessary
- // if the format validation for value schema is disabled, we also
disable the schema
- // compatibility checker for value schema as well.
- val ret = Try(
- checker.check(keySchema, valueSchema,
- ignoreValueSchema = !storeConf.formatValidationCheckValue)
- ).toEither.fold(Some(_), _ => None)
- if (storeConf.stateSchemaCheckEnabled) {
- ret
- } else {
- None
- }
- })
-
- if (result.isDefined) {
- throw result.get
- }
- }
-
// SPARK-42567 - Track load time for state store provider and log
warning if takes longer
// than 2s.
val (provider, loadTimeMs) = Utils.timeTakenMs {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 43ffdf9829b4..9dbc9bf0c320 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -767,6 +767,35 @@ object SymmetricHashJoinStateManager {
}
}
+ def getSchemaForStateStores(
+ joinSide: JoinSide,
+ inputValueAttributes: Seq[Attribute],
+ joinKeys: Seq[Expression],
+ stateFormatVersion: Int): Map[String, (StructType, StructType)] = {
+ var result: Map[String, (StructType, StructType)] = Map.empty
+
+ // get the key and value schema for the KeyToNumValues state store
+ val keySchema = StructType(
+ joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i",
k.dataType, k.nullable) })
+ val longValueSchema = new StructType().add("value", "long")
+ result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema,
longValueSchema))
+
+ // get the key and value schema for the KeyWithIndexToValue state store
+ val keyWithIndexSchema = keySchema.add("index", LongType)
+ val valueSchema = if (stateFormatVersion == 1) {
+ inputValueAttributes
+ } else if (stateFormatVersion == 2) {
+ inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+ } else {
+ throw new IllegalArgumentException("Incorrect state format version! " +
+ s"version=$stateFormatVersion")
+ }
+ result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
+ (keyWithIndexSchema, valueSchema.toStructType))
+
+ result
+ }
+
private sealed trait StateStoreType
private case object KeyToNumValuesType extends StateStoreType {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 9add574e01fc..3d5c418c7f54 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit._
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
@@ -70,6 +72,10 @@ trait StatefulOperator extends SparkPlan {
throw new IllegalStateException("State location not present for
execution")
}
}
+
+ // Function used to record state schema for the first time and validate it
against proposed
+ // schema changes in the future. Runs as part of a planning rule on the
driver.
+ def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit
}
/**
@@ -424,6 +430,11 @@ case class StateStoreRestoreExec(
private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
keyExpressions, child.output, stateFormatVersion)
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keyExpressions.toStructType, stateManager.getStateValueSchema,
session.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
@@ -486,6 +497,11 @@ case class StateStoreSaveExec(
private[sql] val stateManager =
StreamingAggregationStateManager.createStateManager(
keyExpressions, child.output, stateFormatVersion)
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keyExpressions.toStructType, stateManager.getStateValueSchema,
session.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
assert(outputMode.nonEmpty,
@@ -690,6 +706,11 @@ case class SessionWindowStateStoreRestoreExec(
private val stateManager =
StreamingSessionWindowStateManager.createStateManager(
keyWithoutSessionExpressions, sessionExpression, child.output,
stateFormatVersion)
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ stateManager.getStateKeySchema, stateManager.getStateValueSchema,
session.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
@@ -772,6 +793,11 @@ case class SessionWindowStateStoreSaveExec(
private val stateManager =
StreamingSessionWindowStateManager.createStateManager(
keyWithoutSessionExpressions, sessionExpression, child.output,
stateFormatVersion)
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ stateManager.getStateKeySchema, stateManager.getStateValueSchema,
session.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
assert(outputMode.nonEmpty,
@@ -1079,6 +1105,11 @@ case class StreamingDeduplicateExec(
override protected def withNewChildInternal(newChild: SparkPlan):
StreamingDeduplicateExec =
copy(child = newChild)
+
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keyExpressions.toStructType, schemaForValueRow, session.sessionState,
extraOptionOnStateStore)
+ }
}
object StreamingDeduplicateExec {
@@ -1150,6 +1181,11 @@ case class StreamingDeduplicateWithinWatermarkExec(
override def shortName: String = "dedupeWithinWatermark"
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keyExpressions.toStructType, schemaForValueRow, session.sessionState,
extraOptionOnStateStore)
+ }
+
override protected def withNewChildInternal(
newChild: SparkPlan): StreamingDeduplicateWithinWatermarkExec =
copy(child = newChild)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
index e0e3ee582bef..a3cb66f91496 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
@@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.TimeUnit.NANOSECONDS
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution,
Partitioning}
import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode}
-import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateStoreOps}
+import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateSchemaCompatibilityChecker, StateStoreOps}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
import org.apache.spark.util.{CompletionIterator, NextIterator}
@@ -45,6 +47,11 @@ case class StreamingGlobalLimitExec(
private val keySchema = StructType(Array(StructField("key", NullType)))
private val valueSchema = StructType(Array(StructField("value", LongType)))
+ override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration):
Unit = {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo,
hadoopConf,
+ keySchema, valueSchema, session.sessionState)
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
new file mode 100644
index 000000000000..6177f01d501b
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
new file mode 100644
index 000000000000..c8acdedc074b
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
@@ -0,0 +1 @@
+{"id":"a13717f3-7485-421b-b55a-21625123b680"}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
new file mode 100644
index 000000000000..121286161cb6
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
new file mode 100644
index 000000000000..89d73c77c55c
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
new file mode 100644
index 000000000000..7a7b38628a15
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719278977158,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
[...]
+0
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
new file mode 100644
index 000000000000..589d400395e1
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719278978807,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
[...]
+1
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
new file mode 100644
index 000000000000..1992982c58ff
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
new file mode 100644
index 000000000000..fec40e83a547
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
new file mode 100644
index 000000000000..97b2fbbd4cdf
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
new file mode 100644
index 000000000000..c35505fa363f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
new file mode 100644
index 000000000000..d18b77b93aff
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
new file mode 100644
index 000000000000..fcbf8df80f5f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
new file mode 100644
index 000000000000..a0afa9cbeabb
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
new file mode 100644
index 000000000000..9f8d6f4d5cf5
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
@@ -0,0 +1 @@
+{"id":"e41911da-47c9-4560-a95d-e2ab97f2bc85"}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
new file mode 100644
index 000000000000..6cd2a4731154
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
new file mode 100644
index 000000000000..a45ae3899e0b
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719343083746,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
[...]
+0
+0
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
new file mode 100644
index 000000000000..0b5ede766024
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
new file mode 100644
index 000000000000..4da637d14349
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
new file mode 100644
index 000000000000..3f303a9e7b03
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
new file mode 100644
index 000000000000..42448b3b584c
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
new file mode 100644
index 000000000000..0b5ede766024
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
new file mode 100644
index 000000000000..4da637d14349
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
new file mode 100644
index 000000000000..bcc7311689f0
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
new file mode 100644
index 000000000000..8fa8f1675bc8
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..d73ee1ba16c2
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..4e421cd377fb
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..8dee4c86270f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..7e6dce9cc108
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..d73ee1ba16c2
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..4e421cd377fb
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..d0e2f40c18ad
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..2ec494e6a636
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..20918a4ffe6f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..0bdaf341003b
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cb8ce356ad7f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..56b52ab974a3
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..20918a4ffe6f
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..0bdaf341003b
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..a874ad31b740
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
differ
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..0ed4feb1bd9b
Binary files /dev/null and
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
differ
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 84348f7da67f..feab7a5fa3b0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.streaming.state
import java.util.UUID
-import scala.util.Random
+import scala.util.{Random, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import
org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper.newDir
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -254,9 +255,9 @@ class StateSchemaCompatibilityCheckerSuite extends
SharedSparkSession {
test("SPARK-47776: checking for compatibility with collation change in key")
{
verifyException(keySchema, valueSchema, keySchemaWithCollation,
valueSchema,
- ignoreValueSchema = false)
+ ignoreValueSchema = false, keyCollationChecks = true)
verifyException(keySchemaWithCollation, valueSchema, keySchema,
valueSchema,
- ignoreValueSchema = false)
+ ignoreValueSchema = false, keyCollationChecks = true)
}
test("SPARK-47776: checking for compatibility with collation change in
value") {
@@ -288,45 +289,48 @@ class StateSchemaCompatibilityCheckerSuite extends
SharedSparkSession {
StructType(newFields)
}
- private def runSchemaChecker(
- dir: String,
- queryId: UUID,
- newKeySchema: StructType,
- newValueSchema: StructType,
- ignoreValueSchema: Boolean): Unit = {
- // in fact, Spark doesn't support online state schema change, so need to
check
- // schema only once for each running of JVM
- val providerId = StateStoreProviderId(
- StateStoreId(dir, opId, partitionId), queryId)
-
- new StateSchemaCompatibilityChecker(providerId, hadoopConf)
- .check(newKeySchema, newValueSchema, ignoreValueSchema =
ignoreValueSchema)
- }
-
private def verifyException(
oldKeySchema: StructType,
oldValueSchema: StructType,
newKeySchema: StructType,
newValueSchema: StructType,
- ignoreValueSchema: Boolean = false): Unit = {
+ ignoreValueSchema: Boolean = false,
+ keyCollationChecks: Boolean = false): Unit = {
val dir = newDir()
- val queryId = UUID.randomUUID()
- runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
- ignoreValueSchema = ignoreValueSchema)
-
- val e = intercept[SparkUnsupportedOperationException] {
- runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
- ignoreValueSchema = ignoreValueSchema)
+ val runId = UUID.randomUUID()
+ val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+ val formatValidationForValue = !ignoreValueSchema
+ val extraOptions = Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG
+ -> formatValidationForValue.toString)
+
+ val result = Try(
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo,
hadoopConf,
+ oldKeySchema, oldValueSchema, spark.sessionState, extraOptions)
+ ).toEither.fold(Some(_), _ => None)
+
+ val ex = if (result.isDefined) {
+ result.get.asInstanceOf[SparkUnsupportedOperationException]
+ } else {
+ intercept[SparkUnsupportedOperationException] {
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo,
hadoopConf,
+ newKeySchema, newValueSchema, spark.sessionState, extraOptions)
+ }
}
- // if value schema is ignored, the mismatch has to be on the key schema
- if (ignoreValueSchema) {
- assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE")
+ // collation checks are also performed in this path. so we need to check
for them explicitly.
+ if (keyCollationChecks) {
+ assert(ex.getMessage.contains("Binary inequality column is not
supported"))
+ assert(ex.getErrorClass ===
"STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY")
} else {
- assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
- e.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+ if (ignoreValueSchema) {
+ // if value schema is ignored, the mismatch has to be on the key schema
+ assert(ex.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE")
+ } else {
+ assert(ex.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
+ ex.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+ }
+ assert(ex.getMessage.contains("does not match existing"))
}
- assert(e.getMessage.contains("does not match existing"))
}
private def verifySuccess(
@@ -336,10 +340,16 @@ class StateSchemaCompatibilityCheckerSuite extends
SharedSparkSession {
newValueSchema: StructType,
ignoreValueSchema: Boolean = false): Unit = {
val dir = newDir()
- val queryId = UUID.randomUUID()
- runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
- ignoreValueSchema = ignoreValueSchema)
- runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
- ignoreValueSchema = ignoreValueSchema)
+ val runId = UUID.randomUUID()
+ val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+ val formatValidationForValue = !ignoreValueSchema
+ val extraOptions = Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG
+ -> formatValidationForValue.toString)
+
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo,
hadoopConf,
+ oldKeySchema, oldValueSchema, spark.sessionState, extraOptions)
+
+
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo,
hadoopConf,
+ newKeySchema, newValueSchema, spark.sessionState, extraOptions)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 984279158944..854893b1f033 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -453,28 +453,29 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
}
}
- test("SPARK-39650: recovery from checkpoint having all columns as value
schema") {
- // NOTE: We are also changing the schema of input compared to the
checkpoint. In the checkpoint
- // we define the input schema as (String, Int).
- val inputData = MemoryStream[(String, Int, String)]
- val dedupe = inputData.toDS().dropDuplicates("_1")
+ Seq("3.3.0", "3.5.1").foreach { sparkVersion =>
+ test("SPARK-39650: recovery from checkpoint having all columns as value
schema " +
+ s"with sparkVersion=$sparkVersion") {
+ // NOTE: We are also changing the schema of input compared to the
checkpoint.
+ // In the checkpoint we define the input schema as (String, Int).
+ val inputData = MemoryStream[(String, Int, String)]
+ val dedupe = inputData.toDS().dropDuplicates("_1")
+
+ val resourcePath = "/structured-streaming/checkpoint-version-" +
sparkVersion +
+ "-streaming-deduplication/"
+ val resourceUri = this.getClass.getResource(resourcePath).toURI
+
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but
fail subsequent runs.
+ FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+ inputData.addData(("a", 1, "dummy"))
+ inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
- // The fix will land after Spark 3.3.0, hence we can check backward
compatibility with
- // checkpoint being built from Spark 3.3.0.
- val resourceUri = this.getClass.getResource(
-
"/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI
-
- val checkpointDir = Utils.createTempDir().getCanonicalFile
- // Copy the checkpoint to a temp dir to prevent changes to the original.
- // Not doing this will lead to the test passing on the first run, but fail
subsequent runs.
- FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
-
- inputData.addData(("a", 1, "dummy"))
- inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
-
- testStream(dedupe, Append)(
- StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
- /*
+ testStream(dedupe, Append)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+ /*
Note: The checkpoint was generated using the following input in Spark
version 3.3.0
AddData(inputData, ("a", 1)),
CheckLastBatch(("a", 1)),
@@ -482,9 +483,50 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
CheckLastBatch(("b", 3))
*/
- AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
- CheckLastBatch(("c", 9, "c"))
- )
+ AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+ CheckLastBatch(("c", 9, "c"))
+ )
+ }
+ }
+
+ Seq("3.3.0", "3.5.1").foreach { sparkVersion =>
+ test("SPARK-39650: recovery from checkpoint with changes on key schema " +
+ s"are not allowed with sparkVersion=$sparkVersion") {
+ // NOTE: We are also changing the schema of input compared to the
checkpoint.
+ // In the checkpoint we define the input schema as (String, Int).
+ val inputData = MemoryStream[(String, Int, String)]
+ val dedupe = inputData.toDS().dropDuplicates("_1", "_2")
+
+ val resourcePath = "/structured-streaming/checkpoint-version-" +
sparkVersion +
+ "-streaming-deduplication/"
+ val resourceUri = this.getClass.getResource(resourcePath).toURI
+
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but
fail subsequent runs.
+ FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+ inputData.addData(("a", 1, "dummy"))
+ inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
+
+ // trying to evolve the key schema is not allowed and should throw an
exception
+ val ex = intercept[StreamingQueryException] {
+ testStream(dedupe, Append)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+ AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+ CheckLastBatch(("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c"))
+ )
+ }
+
+ // verify that the key schema not compatible error is thrown
+ checkError(
+ ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+ errorClass = "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE",
+ parameters = Map("storedKeySchema" -> ".*",
+ "newKeySchema" -> ".*"),
+ matchPVals = true
+ )
+ }
}
test("collation aware deduplication") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index e05cb4d3c35c..5e9bdad8fd82 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -27,6 +27,7 @@ import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
@@ -688,6 +689,146 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
)
}
+ test("SPARK-48687 - restore the stream-stream inner join query from Spark
3.5 and " +
+ "changing the join condition (key schema) should fail the query") {
+ // NOTE: We are also changing the schema of input compared to the
checkpoint.
+ // In the checkpoint we define the input schema as (Int, Long), which does
not have name
+ // in both left and right.
+ val inputStream = MemoryStream[(Int, Long, String)]
+ val df = inputStream.toDS()
+ .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"),
+ col("_3").as("name"))
+
+ val leftStream = df.select(col("value").as("leftId"),
+ col("timestamp").as("leftTime"), col("name").as("leftName"))
+
+ val rightStream = df
+ // Introduce misses for ease of debugging
+ .where(col("value") % 2 === 0)
+ .select(col("value").as("rightId"),
+ col("timestamp").as("rightTime"), col("name").as("rightName"))
+
+ val query = leftStream
+ .withWatermark("leftTime", "5 seconds")
+ .join(
+ rightStream.withWatermark("rightTime", "5 seconds"),
+ expr("rightId = leftId AND leftName = rightName AND rightTime >=
leftTime AND " +
+ "rightTime <= leftTime + interval 5 seconds"),
+ joinType = "inner")
+ .select(col("leftId"), col("leftTime").cast("int"),
+ col("leftName"),
+ col("rightId"), col("rightTime").cast("int"),
+ col("rightName"))
+
+ val resourceUri = this.getClass.getResource(
+ "/structured-streaming/checkpoint-version-3.5.1-streaming-join/").toURI
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but fail
subsequent runs.
+ FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+ inputStream.addData((1, 1L, "a"), (2, 2L, "b"), (3, 3L, "c"), (4, 4L,
"d"), (5, 5L, "e"))
+
+ val ex = intercept[StreamingQueryException] {
+ testStream(query)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+ /*
+ Note: The checkpoint was generated using the following input in Spark
version 3.5.1
+ The base query is different because it does not use the
leftName/rightName columns
+ as part of the join keys/condition that is used as part of the key
schema.
+
+ AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+ // batch 1 - global watermark = 0
+ // states
+ // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
+ // right: (2, 2L), (4, 4L)
+ CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
+ */
+ AddData(inputStream, (6, 6L, "a"), (7, 7L, "a"), (8, 8L, "a"), (9, 9L,
"a"),
+ (10, 10L, "a")),
+ CheckNewAnswer((6, 6L, "a", 6, 6L, "a"), (8, 8L, "a", 8, 8L, "a"),
+ (10, 10L, "a", 10, 10L, "a"))
+ )
+ }
+
+ checkError(
+ ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+ errorClass = "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE",
+ parameters = Map("storedKeySchema" -> ".*",
+ "newKeySchema" -> ".*"),
+ matchPVals = true
+ )
+ }
+
+ test("SPARK-48687 - restore the stream-stream inner join query from Spark
3.5 and " +
+ "changing the value schema should fail the query") {
+ // NOTE: We are also changing the schema of input compared to the
checkpoint.
+ // In the checkpoint we define the input schema as (Int, Long), which does
not have name
+ // in both left and right.
+ val inputStream = MemoryStream[(Int, Long, String)]
+ val df = inputStream.toDS()
+ .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"),
+ col("_3").as("name"))
+
+ val leftStream = df.select(col("value").as("leftId"),
+ col("timestamp").as("leftTime"), col("name").as("leftName"))
+
+ val rightStream = df
+ // Introduce misses for ease of debugging
+ .where(col("value") % 2 === 0)
+ .select(col("value").as("rightId"),
+ col("timestamp").as("rightTime"), col("name").as("rightName"))
+
+ val query = leftStream
+ .withWatermark("leftTime", "5 seconds")
+ .join(
+ rightStream.withWatermark("rightTime", "5 seconds"),
+ expr("rightId = leftId AND rightTime >= leftTime AND " +
+ "rightTime <= leftTime + interval 5 seconds"),
+ joinType = "inner")
+ .select(col("leftId"), col("leftTime").cast("int"),
+ col("leftName"),
+ col("rightId"), col("rightTime").cast("int"),
+ col("rightName"))
+
+ val resourceUri = this.getClass.getResource(
+ "/structured-streaming/checkpoint-version-3.5.1-streaming-join/").toURI
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but fail
subsequent runs.
+ FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+ inputStream.addData((1, 1L, "a"), (2, 2L, "b"), (3, 3L, "c"), (4, 4L,
"d"), (5, 5L, "e"))
+
+ val ex = intercept[StreamingQueryException] {
+ testStream(query)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+ /*
+ Note: The checkpoint was generated using the following input in Spark
version 3.5.1
+ The base query is different because it does not use the
leftName/rightName columns
+ as part of the generated output that is used as part of the value
schema.
+
+ AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+ // batch 1 - global watermark = 0
+ // states
+ // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
+ // right: (2, 2L), (4, 4L)
+ CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
+ */
+ AddData(inputStream, (6, 6L, "a"), (7, 7L, "a"), (8, 8L, "a"), (9, 9L,
"a"),
+ (10, 10L, "a")),
+ CheckNewAnswer((6, 6L, "a", 6, 6L, "a"), (8, 8L, "a", 8, 8L, "a"),
+ (10, 10L, "a", 10, 10L, "a"))
+ )
+ }
+
+ checkError(
+ ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+ errorClass = "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE",
+ parameters = Map("storedValueSchema" -> ".*",
+ "newValueSchema" -> ".*"),
+ matchPVals = true
+ )
+ }
+
test("SPARK-35896: metrics in StateOperatorProgress are output correctly") {
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]