This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a50b30d7a02b [SPARK-48687][SS] Add change to perform state schema 
validation and update on driver in planning phase for stateful queries
a50b30d7a02b is described below

commit a50b30d7a02bf45f1ddb8db0be6779b1441e1d4d
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Jun 26 22:09:42 2024 +0900

    [SPARK-48687][SS] Add change to perform state schema validation and update 
on driver in planning phase for stateful queries
    
    ### What changes were proposed in this pull request?
    Add change to perform schema validation and update on driver for stateful 
queries
    
    ### Why are the changes needed?
    Avoid making specific partition checks on executor. Also allows for 
versioning easily in the future
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests and ran existing tests
    
    ```
    [info] Run completed in 10 seconds, 500 milliseconds.
    [info] Total number of tests run: 29
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 29, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 35 s, completed Jun 21, 2024, 3:58:12 PM
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47035 from anishshri-db/task/schema-changes-on-driver.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../streaming/FlatMapGroupsWithStateExec.scala     |   7 +
 .../execution/streaming/IncrementalExecution.scala |  17 ++-
 .../streaming/StreamingSymmetricHashJoinExec.scala |  20 +++
 .../streaming/TransformWithStateExec.scala         |   9 ++
 .../state/StateSchemaCompatibilityChecker.scala    | 141 +++++++++++++++++----
 .../sql/execution/streaming/state/StateStore.scala |  49 +------
 .../state/SymmetricHashJoinStateManager.scala      |  29 +++++
 .../execution/streaming/statefulOperators.scala    |  36 ++++++
 .../sql/execution/streaming/streamingLimits.scala  |   9 +-
 .../.metadata.crc                                  | Bin 0 -> 12 bytes
 .../commits/.0.crc                                 | Bin 0 -> 12 bytes
 .../commits/.1.crc                                 | Bin 0 -> 12 bytes
 .../commits/0                                      |   2 +
 .../commits/1                                      |   2 +
 .../metadata                                       |   1 +
 .../offsets/.0.crc                                 | Bin 0 -> 16 bytes
 .../offsets/.1.crc                                 | Bin 0 -> 16 bytes
 .../offsets/0                                      |   3 +
 .../offsets/1                                      |   3 +
 .../state/0/0/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/1.delta                              | Bin 0 -> 77 bytes
 .../state/0/0/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/0/_metadata/.schema.crc                | Bin 0 -> 12 bytes
 .../state/0/0/_metadata/schema                     | Bin 0 -> 197 bytes
 .../state/0/1/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/1/2.delta                              | Bin 0 -> 77 bytes
 .../state/0/2/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/2/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/2.delta                              | Bin 0 -> 46 bytes
 .../.metadata.crc                                  | Bin 0 -> 12 bytes
 .../commits/.0.crc                                 | Bin 0 -> 12 bytes
 .../commits/0                                      |   2 +
 .../metadata                                       |   1 +
 .../offsets/.0.crc                                 | Bin 0 -> 16 bytes
 .../offsets/0                                      |   4 +
 .../state/0/0/left-keyToNumValues/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/0/left-keyToNumValues/1.delta          | Bin 0 -> 46 bytes
 .../0/0/left-keyToNumValues/_metadata/.schema.crc  | Bin 0 -> 12 bytes
 .../state/0/0/left-keyToNumValues/_metadata/schema | Bin 0 -> 199 bytes
 .../0/0/left-keyWithIndexToValue/.1.delta.crc      | Bin 0 -> 12 bytes
 .../state/0/0/left-keyWithIndexToValue/1.delta     | Bin 0 -> 46 bytes
 .../left-keyWithIndexToValue/_metadata/.schema.crc | Bin 0 -> 12 bytes
 .../0/0/left-keyWithIndexToValue/_metadata/schema  | Bin 0 -> 430 bytes
 .../state/0/0/right-keyToNumValues/.1.delta.crc    | Bin 0 -> 12 bytes
 .../state/0/0/right-keyToNumValues/1.delta         | Bin 0 -> 46 bytes
 .../0/0/right-keyToNumValues/_metadata/.schema.crc | Bin 0 -> 12 bytes
 .../0/0/right-keyToNumValues/_metadata/schema      | Bin 0 -> 199 bytes
 .../0/0/right-keyWithIndexToValue/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/0/right-keyWithIndexToValue/1.delta    | Bin 0 -> 46 bytes
 .../_metadata/.schema.crc                          | Bin 0 -> 12 bytes
 .../0/0/right-keyWithIndexToValue/_metadata/schema | Bin 0 -> 432 bytes
 .../state/0/1/left-keyToNumValues/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/1/left-keyToNumValues/1.delta          | Bin 0 -> 46 bytes
 .../0/1/left-keyWithIndexToValue/.1.delta.crc      | Bin 0 -> 12 bytes
 .../state/0/1/left-keyWithIndexToValue/1.delta     | Bin 0 -> 46 bytes
 .../state/0/1/right-keyToNumValues/.1.delta.crc    | Bin 0 -> 12 bytes
 .../state/0/1/right-keyToNumValues/1.delta         | Bin 0 -> 46 bytes
 .../0/1/right-keyWithIndexToValue/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/1/right-keyWithIndexToValue/1.delta    | Bin 0 -> 46 bytes
 .../state/0/2/left-keyToNumValues/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/2/left-keyToNumValues/1.delta          | Bin 0 -> 70 bytes
 .../0/2/left-keyWithIndexToValue/.1.delta.crc      | Bin 0 -> 12 bytes
 .../state/0/2/left-keyWithIndexToValue/1.delta     | Bin 0 -> 89 bytes
 .../state/0/2/right-keyToNumValues/.1.delta.crc    | Bin 0 -> 12 bytes
 .../state/0/2/right-keyToNumValues/1.delta         | Bin 0 -> 70 bytes
 .../0/2/right-keyWithIndexToValue/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/2/right-keyWithIndexToValue/1.delta    | Bin 0 -> 82 bytes
 .../state/0/3/left-keyToNumValues/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/3/left-keyToNumValues/1.delta          | Bin 0 -> 46 bytes
 .../0/3/left-keyWithIndexToValue/.1.delta.crc      | Bin 0 -> 12 bytes
 .../state/0/3/left-keyWithIndexToValue/1.delta     | Bin 0 -> 46 bytes
 .../state/0/3/right-keyToNumValues/.1.delta.crc    | Bin 0 -> 12 bytes
 .../state/0/3/right-keyToNumValues/1.delta         | Bin 0 -> 46 bytes
 .../0/3/right-keyWithIndexToValue/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/3/right-keyWithIndexToValue/1.delta    | Bin 0 -> 46 bytes
 .../state/0/4/left-keyToNumValues/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/4/left-keyToNumValues/1.delta          | Bin 0 -> 70 bytes
 .../0/4/left-keyWithIndexToValue/.1.delta.crc      | Bin 0 -> 12 bytes
 .../state/0/4/left-keyWithIndexToValue/1.delta     | Bin 0 -> 90 bytes
 .../state/0/4/right-keyToNumValues/.1.delta.crc    | Bin 0 -> 12 bytes
 .../state/0/4/right-keyToNumValues/1.delta         | Bin 0 -> 70 bytes
 .../0/4/right-keyWithIndexToValue/.1.delta.crc     | Bin 0 -> 12 bytes
 .../state/0/4/right-keyWithIndexToValue/1.delta    | Bin 0 -> 83 bytes
 .../StateSchemaCompatibilityCheckerSuite.scala     |  84 ++++++------
 .../streaming/StreamingDeduplicationSuite.scala    |  90 +++++++++----
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 141 +++++++++++++++++++++
 98 files changed, 514 insertions(+), 136 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 01b16b63fa27..2fa744a0f89b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.TimeUnit.NANOSECONDS
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -187,6 +189,11 @@ trait FlatMapGroupsWithStateExecBase
     })
   }
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      groupingAttributes.toStructType, stateManager.stateSchema, 
session.sqlContext.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     stateManager // force lazy init at driver
     metrics // force lazy init at driver
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index f72e2eb407f8..f7f3534ea408 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -198,6 +198,16 @@ class IncrementalExecution(
     }
   }
 
+  // Planning rule used to record the state schema for the first run and 
validate state schema
+  // changes across query runs.
+  object StateSchemaValidationRule extends SparkPlanPartialRule {
+    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      case statefulOp: StatefulOperator if isFirstBatch =>
+        statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf)
+        statefulOp
+    }
+  }
+
   object StateOpIdRule extends SparkPlanPartialRule {
     override val rule: PartialFunction[SparkPlan, SparkPlan] = {
       case StateStoreSaveExec(keys, None, None, None, None, stateFormatVersion,
@@ -471,9 +481,12 @@ class IncrementalExecution(
       if (isFirstBatch && currentBatchId != 0) {
         checkOperatorValidWithMetadata(planWithStateOpId)
       }
-      // The rule doesn't change the plan but cause the side effect that 
metadata is written
-      // in the checkpoint directory of stateful operator.
+
+      // The two rules below don't change the plan but can cause the side 
effect that
+      // metadata/schema is written in the checkpoint directory of stateful 
operator.
+      planWithStateOpId transform StateSchemaValidationRule.rule
       planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
+
       simulateWatermarkPropagation(planWithStateOpId)
       planWithStateOpId transform WatermarkPropagationRule.rule
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 20a05a100033..28365eac6e81 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.TimeUnit.NANOSECONDS
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow}
@@ -31,6 +33,7 @@ import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper
 import org.apache.spark.sql.execution.streaming.state._
 import 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}
 
 
@@ -243,6 +246,23 @@ case class StreamingSymmetricHashJoinExec(
     watermarkUsedForStateCleanup && watermarkHasChanged
   }
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    var result: Map[String, (StructType, StructType)] = Map.empty
+    // get state schema for state stores on left side of the join
+    result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(LeftSide,
+      left.output, leftKeys, stateFormatVersion)
+
+    // get state schema for state stores on right side of the join
+    result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(RightSide,
+      right.output, rightKeys, stateFormatVersion)
+
+    // validate and maybe evolve schema for all state stores across both sides 
of the join
+    result.foreach { case (stateStoreName, (keySchema, valueSchema)) =>
+      
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+        keySchema, valueSchema, session.sessionState, storeName = 
stateStoreName)
+    }
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
     val stateStoreCoord = 
session.sessionState.streamingQueryManager.stateStoreCoordinator
     val stateStoreNames = 
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index fbd062acfb5d..c5b22c89e82f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.UUID
 import java.util.concurrent.TimeUnit.NANOSECONDS
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -338,6 +340,13 @@ case class TransformWithStateExec(
     )
   }
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    // TODO: transformWithState is special because we don't have the schema of 
the state directly
+    // within the passed args. We need to gather this after running the init 
function
+    // within the stateful processor on the driver. This also requires a 
schema format change
+    // when recording this information persistently.
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 374ab52b9110..e365cc6371f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -17,12 +17,17 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import scala.util.Try
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
-import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
+import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
StatefulOperatorStateInfo}
 import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, 
SchemaWriter}
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.{DataType, StructType}
 
 class StateSchemaCompatibilityChecker(
@@ -37,30 +42,30 @@ class StateSchemaCompatibilityChecker(
 
   fm.mkdirs(schemaFileLocation.getParent)
 
-  def check(keySchema: StructType, valueSchema: StructType): Unit = {
-    check(keySchema, valueSchema, ignoreValueSchema = false)
-  }
+  /**
+   * Function to check if new state store schema is compatible with the 
existing schema.
+   * @param oldSchema - old state schema
+   * @param newSchema - new state schema
+   * @param ignoreValueSchema - whether to ignore value schema or not
+   */
+  private def check(
+      oldSchema: (StructType, StructType),
+      newSchema: (StructType, StructType),
+      ignoreValueSchema: Boolean) : Unit = {
+    val (storedKeySchema, storedValueSchema) = oldSchema
+    val (keySchema, valueSchema) = newSchema
 
-  def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: 
Boolean): Unit = {
-    if (fm.exists(schemaFileLocation)) {
-      logDebug(s"Schema file for provider $providerId exists. Comparing with 
provided schema.")
-      val (storedKeySchema, storedValueSchema) = readSchemaFile()
-      if (storedKeySchema.equals(keySchema) &&
-        (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
-        // schema is exactly same
-      } else if (!schemasCompatible(storedKeySchema, keySchema)) {
-        throw 
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
-          keySchema.toString)
-      } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, 
valueSchema)) {
-        throw 
StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
-          valueSchema.toString)
-      } else {
-        logInfo("Detected schema change which is compatible. Allowing to put 
rows.")
-      }
+    if (storedKeySchema.equals(keySchema) &&
+      (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
+      // schema is exactly same
+    } else if (!schemasCompatible(storedKeySchema, keySchema)) {
+      throw 
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
+        keySchema.toString)
+    } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, 
valueSchema)) {
+      throw 
StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
+        valueSchema.toString)
     } else {
-      // schema doesn't exist, create one now
-      logDebug(s"Schema file for provider $providerId doesn't exist. Creating 
one.")
-      createSchemaFile(keySchema, valueSchema)
+      logInfo("Detected schema change which is compatible. Allowing to put 
rows.")
     }
   }
 
@@ -82,7 +87,20 @@ class StateSchemaCompatibilityChecker(
     }
   }
 
-  def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit = 
{
+  /**
+   * Function to read and return the existing key and value schema from the 
schema file, if it
+   * exists
+   * @return - Option of (keySchema, valueSchema) if the schema file exists, 
None otherwise
+   */
+  private def getExistingKeyAndValueSchema(): Option[(StructType, StructType)] 
= {
+    if (fm.exists(schemaFileLocation)) {
+      Some(readSchemaFile())
+    } else {
+      None
+    }
+  }
+
+  private def createSchemaFile(keySchema: StructType, valueSchema: 
StructType): Unit = {
     createSchemaFile(keySchema, valueSchema, schemaWriter)
   }
 
@@ -103,10 +121,85 @@ class StateSchemaCompatibilityChecker(
     }
   }
 
+  def validateAndMaybeEvolveStateSchema(
+      newKeySchema: StructType,
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean): Unit = {
+    val existingSchema = getExistingKeyAndValueSchema()
+    if (existingSchema.isEmpty) {
+      // write the schema file if it doesn't exist
+      createSchemaFile(newKeySchema, newValueSchema)
+    } else {
+      // validate if the new schema is compatible with the existing schema
+      check(existingSchema.get, (newKeySchema, newValueSchema), 
ignoreValueSchema)
+    }
+  }
+
   private def schemaFile(storeCpLocation: Path): Path =
     new Path(new Path(storeCpLocation, "_metadata"), "schema")
 }
 
 object StateSchemaCompatibilityChecker {
   val VERSION = 2
+
+  private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
+    if (!UnsafeRowUtils.isBinaryStable(schema)) {
+      throw new SparkUnsupportedOperationException(
+        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
+        messageParameters = Map("schema" -> schema.json)
+      )
+    }
+  }
+
+  /**
+   * Function to validate the schema of the state store and maybe evolve it if 
needed.
+   * We also verify for binary inequality columns in the schema and disallow 
them. We then perform
+   * key and value schema validation. Depending on the passed configs, a 
warning might be logged
+   * or an exception might be thrown if the schema is not compatible.
+   *
+   * @param stateInfo - StatefulOperatorStateInfo containing the state store 
information
+   * @param hadoopConf - Hadoop configuration
+   * @param newKeySchema - New key schema
+   * @param newValueSchema - New value schema
+   * @param sessionState - session state used to retrieve session config
+   * @param extraOptions - any extra options to be passed for StateStoreConf 
creation
+   * @param storeName - optional state store name
+   */
+  def validateAndMaybeEvolveStateSchema(
+      stateInfo: StatefulOperatorStateInfo,
+      hadoopConf: Configuration,
+      newKeySchema: StructType,
+      newValueSchema: StructType,
+      sessionState: SessionState,
+      extraOptions: Map[String, String] = Map.empty,
+      storeName: String = StateStoreId.DEFAULT_STORE_NAME): Unit = {
+    // SPARK-47776: collation introduces the concept of binary (in)equality, 
which means
+    // in some collation we no longer be able to just compare the binary 
format of two
+    // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be 
"semantically"
+    // same in case insensitive collation.
+    // State store is basically key-value storage, and the most provider 
implementations
+    // rely on the fact that all the columns in the key schema support binary 
equality.
+    // We need to disallow using binary inequality column in the key schema, 
before we
+    // could support this in majority of state store providers (or high-level 
of state
+    // store.)
+    disallowBinaryInequalityColumn(newKeySchema)
+
+    val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+    val providerId = 
StateStoreProviderId(StateStoreId(stateInfo.checkpointLocation,
+      stateInfo.operatorId, 0, storeName), stateInfo.queryRunId)
+    val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
+    // regardless of configuration, we check compatibility to at least write 
schema file
+    // if necessary
+    // if the format validation for value schema is disabled, we also disable 
the schema
+    // compatibility checker for value schema as well.
+    val result = Try(
+      checker.validateAndMaybeEvolveStateSchema(newKeySchema, newValueSchema,
+        ignoreValueSchema = !storeConf.formatValidationCheckValue)
+    ).toEither.fold(Some(_), _ => None)
+
+    // if schema validation is enabled and an exception is thrown, we re-throw 
it and fail the query
+    if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
+      throw result.get
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 2f9ce2c236f4..b94f5d6cecd1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -23,13 +23,12 @@ import java.util.concurrent.atomic.AtomicReference
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
-import scala.util.Try
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, 
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
@@ -524,9 +523,6 @@ object StateStore extends Logging {
   @GuardedBy("loadedProviders")
   private val loadedProviders = new mutable.HashMap[StateStoreProviderId, 
StateStoreProvider]()
 
-  @GuardedBy("loadedProviders")
-  private val schemaValidated = new mutable.HashMap[StateStoreProviderId, 
Option[Throwable]]()
-
   private val maintenanceThreadPoolLock = new Object
 
   // Shared exception between threads in thread pool that the scheduling thread
@@ -649,15 +645,6 @@ object StateStore extends Logging {
     storeProvider.getStore(version)
   }
 
-  private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
-    if (!UnsafeRowUtils.isBinaryStable(schema)) {
-      throw new SparkUnsupportedOperationException(
-        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
-        messageParameters = Map("schema" -> schema.json)
-      )
-    }
-  }
-
   private def getStateStoreProvider(
       storeProviderId: StateStoreProviderId,
       keySchema: StructType,
@@ -670,40 +657,6 @@ object StateStore extends Logging {
     loadedProviders.synchronized {
       startMaintenanceIfNeeded(storeConf)
 
-      if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) 
{
-        val result = schemaValidated.getOrElseUpdate(storeProviderId, {
-          // SPARK-47776: collation introduces the concept of binary 
(in)equality, which means
-          // in some collation we no longer be able to just compare the binary 
format of two
-          // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' 
can be "semantically"
-          // same in case insensitive collation.
-          // State store is basically key-value storage, and the most provider 
implementations
-          // rely on the fact that all the columns in the key schema support 
binary equality.
-          // We need to disallow using binary inequality column in the key 
schema, before we
-          // could support this in majority of state store providers (or 
high-level of state
-          // store.)
-          disallowBinaryInequalityColumn(keySchema)
-
-          val checker = new StateSchemaCompatibilityChecker(storeProviderId, 
hadoopConf)
-          // regardless of configuration, we check compatibility to at least 
write schema file
-          // if necessary
-          // if the format validation for value schema is disabled, we also 
disable the schema
-          // compatibility checker for value schema as well.
-          val ret = Try(
-            checker.check(keySchema, valueSchema,
-              ignoreValueSchema = !storeConf.formatValidationCheckValue)
-          ).toEither.fold(Some(_), _ => None)
-          if (storeConf.stateSchemaCheckEnabled) {
-            ret
-          } else {
-            None
-          }
-        })
-
-        if (result.isDefined) {
-          throw result.get
-        }
-      }
-
       // SPARK-42567 - Track load time for state store provider and log 
warning if takes longer
       // than 2s.
       val (provider, loadTimeMs) = Utils.timeTakenMs {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 43ffdf9829b4..9dbc9bf0c320 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -767,6 +767,35 @@ object SymmetricHashJoinStateManager {
     }
   }
 
+  def getSchemaForStateStores(
+      joinSide: JoinSide,
+      inputValueAttributes: Seq[Attribute],
+      joinKeys: Seq[Expression],
+      stateFormatVersion: Int): Map[String, (StructType, StructType)] = {
+    var result: Map[String, (StructType, StructType)] = Map.empty
+
+    // get the key and value schema for the KeyToNumValues state store
+    val keySchema = StructType(
+      joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", 
k.dataType, k.nullable) })
+    val longValueSchema = new StructType().add("value", "long")
+    result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, 
longValueSchema))
+
+    // get the key and value schema for the KeyWithIndexToValue state store
+    val keyWithIndexSchema = keySchema.add("index", LongType)
+    val valueSchema = if (stateFormatVersion == 1) {
+      inputValueAttributes
+    } else if (stateFormatVersion == 2) {
+      inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+    } else {
+      throw new IllegalArgumentException("Incorrect state format version! " +
+        s"version=$stateFormatVersion")
+    }
+    result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
+      (keyWithIndexSchema, valueSchema.toStructType))
+
+    result
+  }
+
   private sealed trait StateStoreType
 
   private case object KeyToNumValuesType extends StateStoreType {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 9add574e01fc..3d5c418c7f54 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit._
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.AnalysisException
@@ -70,6 +72,10 @@ trait StatefulOperator extends SparkPlan {
       throw new IllegalStateException("State location not present for 
execution")
     }
   }
+
+  // Function used to record state schema for the first time and validate it 
against proposed
+  // schema changes in the future. Runs as part of a planning rule on the 
driver.
+  def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit
 }
 
 /**
@@ -424,6 +430,11 @@ case class StateStoreRestoreExec(
   private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
     keyExpressions, child.output, stateFormatVersion)
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      keyExpressions.toStructType, stateManager.getStateValueSchema, 
session.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
 
@@ -486,6 +497,11 @@ case class StateStoreSaveExec(
   private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
     keyExpressions, child.output, stateFormatVersion)
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      keyExpressions.toStructType, stateManager.getStateValueSchema, 
session.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
     assert(outputMode.nonEmpty,
@@ -690,6 +706,11 @@ case class SessionWindowStateStoreRestoreExec(
   private val stateManager = 
StreamingSessionWindowStateManager.createStateManager(
     keyWithoutSessionExpressions, sessionExpression, child.output, 
stateFormatVersion)
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      stateManager.getStateKeySchema, stateManager.getStateValueSchema, 
session.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
 
@@ -772,6 +793,11 @@ case class SessionWindowStateStoreSaveExec(
   private val stateManager = 
StreamingSessionWindowStateManager.createStateManager(
     keyWithoutSessionExpressions, sessionExpression, child.output, 
stateFormatVersion)
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      stateManager.getStateKeySchema, stateManager.getStateValueSchema, 
session.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
     assert(outputMode.nonEmpty,
@@ -1079,6 +1105,11 @@ case class StreamingDeduplicateExec(
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
StreamingDeduplicateExec =
     copy(child = newChild)
+
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      keyExpressions.toStructType, schemaForValueRow, session.sessionState, 
extraOptionOnStateStore)
+  }
 }
 
 object StreamingDeduplicateExec {
@@ -1150,6 +1181,11 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   override def shortName: String = "dedupeWithinWatermark"
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      keyExpressions.toStructType, schemaForValueRow, session.sessionState, 
extraOptionOnStateStore)
+  }
+
   override protected def withNewChildInternal(
       newChild: SparkPlan): StreamingDeduplicateWithinWatermarkExec = 
copy(child = newChild)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
index e0e3ee582bef..a3cb66f91496 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
@@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.TimeUnit.NANOSECONDS
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, 
Partitioning}
 import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode}
-import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStoreOps}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateSchemaCompatibilityChecker, StateStoreOps}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
 import org.apache.spark.util.{CompletionIterator, NextIterator}
@@ -45,6 +47,11 @@ case class StreamingGlobalLimitExec(
   private val keySchema = StructType(Array(StructField("key", NullType)))
   private val valueSchema = StructType(Array(StructField("value", LongType)))
 
+  override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): 
Unit = {
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, 
hadoopConf,
+      keySchema, valueSchema, session.sessionState)
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
new file mode 100644
index 000000000000..6177f01d501b
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/.metadata.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/.1.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
new file mode 100644
index 000000000000..c8acdedc074b
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/metadata
@@ -0,0 +1 @@
+{"id":"a13717f3-7485-421b-b55a-21625123b680"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
new file mode 100644
index 000000000000..121286161cb6
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
new file mode 100644
index 000000000000..89d73c77c55c
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/.1.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
new file mode 100644
index 000000000000..7a7b38628a15
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719278977158,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
 [...]
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
new file mode 100644
index 000000000000..589d400395e1
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719278978807,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
 [...]
+1
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
new file mode 100644
index 000000000000..1992982c58ff
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
new file mode 100644
index 000000000000..fec40e83a547
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
new file mode 100644
index 000000000000..97b2fbbd4cdf
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
new file mode 100644
index 000000000000..c35505fa363f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/0/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
new file mode 100644
index 000000000000..d18b77b93aff
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
new file mode 100644
index 000000000000..fcbf8df80f5f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/1/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/2/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/3/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/state/0/4/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
new file mode 100644
index 000000000000..a0afa9cbeabb
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/.metadata.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
new file mode 100644
index 000000000000..1aee7033161e
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
new file mode 100644
index 000000000000..9f8d6f4d5cf5
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/metadata
@@ -0,0 +1 @@
+{"id":"e41911da-47c9-4560-a95d-e2ab97f2bc85"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
new file mode 100644
index 000000000000..6cd2a4731154
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
new file mode 100644
index 000000000000..a45ae3899e0b
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/offsets/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1719343083746,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
 [...]
+0
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
new file mode 100644
index 000000000000..0b5ede766024
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
new file mode 100644
index 000000000000..4da637d14349
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyToNumValues/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
new file mode 100644
index 000000000000..3f303a9e7b03
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
new file mode 100644
index 000000000000..42448b3b584c
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/left-keyWithIndexToValue/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
new file mode 100644
index 000000000000..0b5ede766024
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
new file mode 100644
index 000000000000..4da637d14349
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyToNumValues/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
new file mode 100644
index 000000000000..bcc7311689f0
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
new file mode 100644
index 000000000000..8fa8f1675bc8
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/0/right-keyWithIndexToValue/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/1/right-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..d73ee1ba16c2
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..4e421cd377fb
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..8dee4c86270f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..7e6dce9cc108
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..d73ee1ba16c2
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..4e421cd377fb
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..d0e2f40c18ad
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..2ec494e6a636
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/2/right-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/left-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cf1d68e2acee
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/3/right-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..20918a4ffe6f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
new file mode 100644
index 000000000000..0bdaf341003b
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..cb8ce356ad7f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..56b52ab974a3
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/left-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
new file mode 100644
index 000000000000..20918a4ffe6f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
new file mode 100644
index 000000000000..0bdaf341003b
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyToNumValues/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
new file mode 100644
index 000000000000..a874ad31b740
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
new file mode 100644
index 000000000000..0ed4feb1bd9b
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.1-streaming-join/state/0/4/right-keyWithIndexToValue/1.delta
 differ
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 84348f7da67f..feab7a5fa3b0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.util.UUID
 
-import scala.util.Random
+import scala.util.{Random, Try}
 
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import 
org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper.newDir
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -254,9 +255,9 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
 
   test("SPARK-47776: checking for compatibility with collation change in key") 
{
     verifyException(keySchema, valueSchema, keySchemaWithCollation, 
valueSchema,
-      ignoreValueSchema = false)
+      ignoreValueSchema = false, keyCollationChecks = true)
     verifyException(keySchemaWithCollation, valueSchema, keySchema, 
valueSchema,
-      ignoreValueSchema = false)
+      ignoreValueSchema = false, keyCollationChecks = true)
   }
 
   test("SPARK-47776: checking for compatibility with collation change in 
value") {
@@ -288,45 +289,48 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     StructType(newFields)
   }
 
-  private def runSchemaChecker(
-      dir: String,
-      queryId: UUID,
-      newKeySchema: StructType,
-      newValueSchema: StructType,
-      ignoreValueSchema: Boolean): Unit = {
-    // in fact, Spark doesn't support online state schema change, so need to 
check
-    // schema only once for each running of JVM
-    val providerId = StateStoreProviderId(
-      StateStoreId(dir, opId, partitionId), queryId)
-
-    new StateSchemaCompatibilityChecker(providerId, hadoopConf)
-      .check(newKeySchema, newValueSchema, ignoreValueSchema = 
ignoreValueSchema)
-  }
-
   private def verifyException(
       oldKeySchema: StructType,
       oldValueSchema: StructType,
       newKeySchema: StructType,
       newValueSchema: StructType,
-      ignoreValueSchema: Boolean = false): Unit = {
+      ignoreValueSchema: Boolean = false,
+      keyCollationChecks: Boolean = false): Unit = {
     val dir = newDir()
-    val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
-      ignoreValueSchema = ignoreValueSchema)
-
-    val e = intercept[SparkUnsupportedOperationException] {
-      runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
-        ignoreValueSchema = ignoreValueSchema)
+    val runId = UUID.randomUUID()
+    val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+    val formatValidationForValue = !ignoreValueSchema
+    val extraOptions = Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG
+      -> formatValidationForValue.toString)
+
+    val result = Try(
+      
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, 
hadoopConf,
+        oldKeySchema, oldValueSchema, spark.sessionState, extraOptions)
+    ).toEither.fold(Some(_), _ => None)
+
+    val ex = if (result.isDefined) {
+      result.get.asInstanceOf[SparkUnsupportedOperationException]
+    } else {
+      intercept[SparkUnsupportedOperationException] {
+        
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, 
hadoopConf,
+          newKeySchema, newValueSchema, spark.sessionState, extraOptions)
+      }
     }
 
-    // if value schema is ignored, the mismatch has to be on the key schema
-    if (ignoreValueSchema) {
-      assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE")
+    // collation checks are also performed in this path. so we need to check 
for them explicitly.
+    if (keyCollationChecks) {
+      assert(ex.getMessage.contains("Binary inequality column is not 
supported"))
+      assert(ex.getErrorClass === 
"STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY")
     } else {
-      assert(e.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
-        e.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+      if (ignoreValueSchema) {
+        // if value schema is ignored, the mismatch has to be on the key schema
+        assert(ex.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE")
+      } else {
+        assert(ex.getErrorClass === "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" ||
+          ex.getErrorClass === "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE")
+      }
+      assert(ex.getMessage.contains("does not match existing"))
     }
-    assert(e.getMessage.contains("does not match existing"))
   }
 
   private def verifySuccess(
@@ -336,10 +340,16 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
       newValueSchema: StructType,
       ignoreValueSchema: Boolean = false): Unit = {
     val dir = newDir()
-    val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
-      ignoreValueSchema = ignoreValueSchema)
-    runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
-      ignoreValueSchema = ignoreValueSchema)
+    val runId = UUID.randomUUID()
+    val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+    val formatValidationForValue = !ignoreValueSchema
+    val extraOptions = Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG
+      -> formatValidationForValue.toString)
+
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, 
hadoopConf,
+      oldKeySchema, oldValueSchema, spark.sessionState, extraOptions)
+
+    
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(stateInfo, 
hadoopConf,
+      newKeySchema, newValueSchema, spark.sessionState, extraOptions)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 984279158944..854893b1f033 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -453,28 +453,29 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     }
   }
 
-  test("SPARK-39650: recovery from checkpoint having all columns as value 
schema") {
-    // NOTE: We are also changing the schema of input compared to the 
checkpoint. In the checkpoint
-    // we define the input schema as (String, Int).
-    val inputData = MemoryStream[(String, Int, String)]
-    val dedupe = inputData.toDS().dropDuplicates("_1")
+  Seq("3.3.0", "3.5.1").foreach { sparkVersion =>
+    test("SPARK-39650: recovery from checkpoint having all columns as value 
schema " +
+      s"with sparkVersion=$sparkVersion") {
+      // NOTE: We are also changing the schema of input compared to the 
checkpoint.
+      // In the checkpoint we define the input schema as (String, Int).
+      val inputData = MemoryStream[(String, Int, String)]
+      val dedupe = inputData.toDS().dropDuplicates("_1")
+
+      val resourcePath = "/structured-streaming/checkpoint-version-" + 
sparkVersion +
+        "-streaming-deduplication/"
+      val resourceUri = this.getClass.getResource(resourcePath).toURI
+
+      val checkpointDir = Utils.createTempDir().getCanonicalFile
+      // Copy the checkpoint to a temp dir to prevent changes to the original.
+      // Not doing this will lead to the test passing on the first run, but 
fail subsequent runs.
+      FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+      inputData.addData(("a", 1, "dummy"))
+      inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
 
-    // The fix will land after Spark 3.3.0, hence we can check backward 
compatibility with
-    // checkpoint being built from Spark 3.3.0.
-    val resourceUri = this.getClass.getResource(
-      
"/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI
-
-    val checkpointDir = Utils.createTempDir().getCanonicalFile
-    // Copy the checkpoint to a temp dir to prevent changes to the original.
-    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
-    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
-
-    inputData.addData(("a", 1, "dummy"))
-    inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
-
-    testStream(dedupe, Append)(
-      StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
-      /*
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        /*
         Note: The checkpoint was generated using the following input in Spark 
version 3.3.0
         AddData(inputData, ("a", 1)),
         CheckLastBatch(("a", 1)),
@@ -482,9 +483,50 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
         CheckLastBatch(("b", 3))
        */
 
-      AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
-      CheckLastBatch(("c", 9, "c"))
-    )
+        AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+        CheckLastBatch(("c", 9, "c"))
+      )
+    }
+  }
+
+  Seq("3.3.0", "3.5.1").foreach { sparkVersion =>
+    test("SPARK-39650: recovery from checkpoint with changes on key schema " +
+      s"are not allowed with sparkVersion=$sparkVersion") {
+      // NOTE: We are also changing the schema of input compared to the 
checkpoint.
+      // In the checkpoint we define the input schema as (String, Int).
+      val inputData = MemoryStream[(String, Int, String)]
+      val dedupe = inputData.toDS().dropDuplicates("_1", "_2")
+
+      val resourcePath = "/structured-streaming/checkpoint-version-" + 
sparkVersion +
+        "-streaming-deduplication/"
+      val resourceUri = this.getClass.getResource(resourcePath).toURI
+
+      val checkpointDir = Utils.createTempDir().getCanonicalFile
+      // Copy the checkpoint to a temp dir to prevent changes to the original.
+      // Not doing this will lead to the test passing on the first run, but 
fail subsequent runs.
+      FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+      inputData.addData(("a", 1, "dummy"))
+      inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
+
+      // trying to evolve the key schema is not allowed and should throw an 
exception
+      val ex = intercept[StreamingQueryException] {
+        testStream(dedupe, Append)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+          CheckLastBatch(("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c"))
+        )
+      }
+
+      // verify that the key schema not compatible error is thrown
+      checkError(
+        ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+        errorClass = "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE",
+        parameters = Map("storedKeySchema" -> ".*",
+          "newKeySchema" -> ".*"),
+        matchPVals = true
+      )
+    }
   }
 
   test("collation aware deduplication") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index e05cb4d3c35c..5e9bdad8fd82 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -27,6 +27,7 @@ import scala.util.Random
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
@@ -688,6 +689,146 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
     )
   }
 
+  test("SPARK-48687 - restore the stream-stream inner join query from Spark 
3.5 and " +
+   "changing the join condition (key schema) should fail the query") {
+    // NOTE: We are also changing the schema of input compared to the 
checkpoint.
+    // In the checkpoint we define the input schema as (Int, Long), which does 
not have name
+    // in both left and right.
+    val inputStream = MemoryStream[(Int, Long, String)]
+    val df = inputStream.toDS()
+      .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"),
+        col("_3").as("name"))
+
+    val leftStream = df.select(col("value").as("leftId"),
+      col("timestamp").as("leftTime"), col("name").as("leftName"))
+
+    val rightStream = df
+      // Introduce misses for ease of debugging
+      .where(col("value") % 2 === 0)
+      .select(col("value").as("rightId"),
+        col("timestamp").as("rightTime"), col("name").as("rightName"))
+
+    val query = leftStream
+      .withWatermark("leftTime", "5 seconds")
+      .join(
+        rightStream.withWatermark("rightTime", "5 seconds"),
+        expr("rightId = leftId AND leftName = rightName AND rightTime >= 
leftTime AND " +
+          "rightTime <= leftTime + interval 5 seconds"),
+        joinType = "inner")
+      .select(col("leftId"), col("leftTime").cast("int"),
+        col("leftName"),
+        col("rightId"), col("rightTime").cast("int"),
+        col("rightName"))
+
+    val resourceUri = this.getClass.getResource(
+      "/structured-streaming/checkpoint-version-3.5.1-streaming-join/").toURI
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+    inputStream.addData((1, 1L, "a"), (2, 2L, "b"), (3, 3L, "c"), (4, 4L, 
"d"), (5, 5L, "e"))
+
+    val ex = intercept[StreamingQueryException] {
+      testStream(query)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        /*
+        Note: The checkpoint was generated using the following input in Spark 
version 3.5.1
+        The base query is different because it does not use the 
leftName/rightName columns
+        as part of the join keys/condition that is used as part of the key 
schema.
+
+        AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+        // batch 1 - global watermark = 0
+        // states
+        // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
+        // right: (2, 2L), (4, 4L)
+        CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
+        */
+        AddData(inputStream, (6, 6L, "a"), (7, 7L, "a"), (8, 8L, "a"), (9, 9L, 
"a"),
+          (10, 10L, "a")),
+        CheckNewAnswer((6, 6L, "a", 6, 6L, "a"), (8, 8L, "a", 8, 8L, "a"),
+          (10, 10L, "a", 10, 10L, "a"))
+      )
+    }
+
+    checkError(
+      ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+      errorClass = "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE",
+      parameters = Map("storedKeySchema" -> ".*",
+        "newKeySchema" -> ".*"),
+      matchPVals = true
+    )
+  }
+
+  test("SPARK-48687 - restore the stream-stream inner join query from Spark 
3.5 and " +
+   "changing the value schema should fail the query") {
+    // NOTE: We are also changing the schema of input compared to the 
checkpoint.
+    // In the checkpoint we define the input schema as (Int, Long), which does 
not have name
+    // in both left and right.
+    val inputStream = MemoryStream[(Int, Long, String)]
+    val df = inputStream.toDS()
+      .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"),
+        col("_3").as("name"))
+
+    val leftStream = df.select(col("value").as("leftId"),
+      col("timestamp").as("leftTime"), col("name").as("leftName"))
+
+    val rightStream = df
+      // Introduce misses for ease of debugging
+      .where(col("value") % 2 === 0)
+      .select(col("value").as("rightId"),
+        col("timestamp").as("rightTime"), col("name").as("rightName"))
+
+    val query = leftStream
+      .withWatermark("leftTime", "5 seconds")
+      .join(
+        rightStream.withWatermark("rightTime", "5 seconds"),
+        expr("rightId = leftId AND rightTime >= leftTime AND " +
+          "rightTime <= leftTime + interval 5 seconds"),
+        joinType = "inner")
+      .select(col("leftId"), col("leftTime").cast("int"),
+        col("leftName"),
+        col("rightId"), col("rightTime").cast("int"),
+        col("rightName"))
+
+    val resourceUri = this.getClass.getResource(
+      "/structured-streaming/checkpoint-version-3.5.1-streaming-join/").toURI
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+    inputStream.addData((1, 1L, "a"), (2, 2L, "b"), (3, 3L, "c"), (4, 4L, 
"d"), (5, 5L, "e"))
+
+    val ex = intercept[StreamingQueryException] {
+      testStream(query)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        /*
+        Note: The checkpoint was generated using the following input in Spark 
version 3.5.1
+        The base query is different because it does not use the 
leftName/rightName columns
+        as part of the generated output that is used as part of the value 
schema.
+
+        AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+        // batch 1 - global watermark = 0
+        // states
+        // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
+        // right: (2, 2L), (4, 4L)
+        CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
+        */
+        AddData(inputStream, (6, 6L, "a"), (7, 7L, "a"), (8, 8L, "a"), (9, 9L, 
"a"),
+          (10, 10L, "a")),
+        CheckNewAnswer((6, 6L, "a", 6, 6L, "a"), (8, 8L, "a", 8, 8L, "a"),
+          (10, 10L, "a", 10, 10L, "a"))
+      )
+    }
+
+    checkError(
+      ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+      errorClass = "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE",
+      parameters = Map("storedValueSchema" -> ".*",
+        "newValueSchema" -> ".*"),
+      matchPVals = true
+    )
+  }
+
   test("SPARK-35896: metrics in StateOperatorProgress are output correctly") {
     val input1 = MemoryStream[Int]
     val input2 = MemoryStream[Int]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to