This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8023504e69fd [SPARK-49594][SS] Adding check on whether columnFamilies 
were added or removed to write StateSchemaV3 file
8023504e69fd is described below

commit 8023504e69fdd037dea002e961b960fd9fa662ba
Author: Eric Marnadi <[email protected]>
AuthorDate: Thu Sep 12 12:01:08 2024 +0900

    [SPARK-49594][SS] Adding check on whether columnFamilies were added or 
removed to write StateSchemaV3 file
    
    ### What changes were proposed in this pull request?
    
    Up until this [PR](https://github.com/apache/spark/pull/47880) that enabled 
deleteIfExists, we changed the condition on which we throw an error. However, 
in doing so, we are not writing schema files whenever we add or remove column 
families, which is functionally incorrect.
    Additionally, we were initially always writing the newSchemaFilePath to the 
OperatorStateMetadata upon every new query run, when we should only do this if 
the schema changes.
    ### Why are the changes needed?
    
    These changes are needed because we want to write a schema file out every 
time we add or remove column families. Also, we want to make sure that we point 
to the old schema file for the current metadata file if the schema has not 
changed between this run and the last one, as opposed to populating the 
metadata with a new schema file path every time, even if this file is not 
created.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Amended unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48067 from ericm-db/add-remove-cf.
    
    Authored-by: Eric Marnadi <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../state/StateSchemaCompatibilityChecker.scala    |  40 +++-
 .../sql/streaming/TransformWithStateSuite.scala    | 219 ++++++++++++++++++++-
 2 files changed, 250 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 90eb634689b2..3a1793f71794 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
 import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
StatefulOperatorStateInfo}
 import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, 
SchemaWriter}
+import 
org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker(
       stateStoreColFamilySchema: List[StateStoreColFamilySchema],
       stateSchemaVersion: Int): Unit = {
     // Ensure that schema file path is passed explicitly for schema version 3
-    if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) {
+    if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) {
       throw new IllegalStateException("Schema file path is required for schema 
version 3")
     }
 
@@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker(
           check(existingStateSchema, newSchema, ignoreValueSchema)
         }
       }
+      val colFamiliesAddedOrRemoved =
+        newStateSchemaList.map(_.colFamilyName) != 
existingStateSchemaList.map(_.colFamilyName)
+      if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) 
{
+        createSchemaFile(newStateSchemaList, stateSchemaVersion)
+      }
       // TODO: [SPARK-49535] Write Schema files after schema has changed for 
StateSchemaV3
-      false
+      colFamiliesAddedOrRemoved
     }
   }
 
@@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker(
 }
 
 object StateSchemaCompatibilityChecker {
+
+  val SCHEMA_FORMAT_V3: Int = 3
+
   private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
     if (!UnsafeRowUtils.isBinaryStable(schema)) {
       throw new SparkUnsupportedOperationException(
@@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker {
     if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
       throw result.get
     }
-    val schemaFileLocation = newSchemaFilePath match {
-      case Some(path) => path.toString
-      case None => checker.schemaFileLocation.toString
+    val schemaFileLocation = if (evolvedSchema) {
+      // if we are using the state schema v3, and we have
+      // evolved schema, this newSchemaFilePath should be defined
+      // and we want to populate the metadata with this file
+      if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
+        newSchemaFilePath.get.toString
+      } else {
+        // if we are using any version less than v3, we have written
+        // the schema to this static location, which we will return
+        checker.schemaFileLocation.toString
+      }
+    } else {
+      // if we have not evolved schema (there has been a previous schema)
+      // and we are using state schema v3, this file path would be defined
+      // so we would just populate the next run's metadata file with this
+      // file path
+      if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
+        oldSchemaFilePath.get.toString
+      } else {
+        // if we are using any version less than v3, we have written
+        // the schema to this static location, which we will return
+        checker.schemaFileLocation.toString
+      }
     }
+
     StateSchemaValidationResult(evolvedSchema, schemaFileLocation)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index a17f3847323d..d0e255bb3049 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
         TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
       SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
       withTempDir { chkptDir =>
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
         // in this test case, we are changing the state spec back and forth
         // to trigger the writing of the schema and metadata files
         val inputData = MemoryStream[(String, String)]
@@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
           },
           StopStream
         )
+        // assert that a metadata and schema file has been written for each run
+        // as state variables have been deleted
+        assert(getFiles(metadataPath).length == 2)
+        assert(getFiles(stateSchemaPath).length == 2)
+
         val result3 = inputData.toDS()
           .groupByKey(x => x._1)
           .transformWithState(new RunningCountMostRecentStatefulProcessor(),
@@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
           },
           StopStream
         )
-        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
-        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
-
-        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
         // by the end of the test, there have been 4 batches,
         // so the metadata and schema logs, and commitLog has been purged
         // for batches 0 and 1 so metadata and schema files exist for batches 
0, 1, 2, 3
@@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  test("transformWithState - verify that schema file is kept after metadata is 
purged") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+      withTempDir { chkptDir =>
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+        // in this test case, we are changing the state spec back and forth
+        // to trigger the writing of the schema and metadata files
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "2", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        assert(getFiles(metadataPath).length == 3)
+        assert(getFiles(stateSchemaPath).length == 2)
+
+        val result3 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "1", "str2")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        // metadata files should be kept for batches 1, 2, 3
+        // schema files should be kept for batches 0, 2, 3
+        assert(getFiles(metadataPath).length == 3)
+        assert(getFiles(stateSchemaPath).length == 3)
+        // we want to ensure that we can read batch 1 even though the
+        // metadata file for batch 0 was removed
+        val batch1Df = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
+          .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+          .option(StateSourceOptions.BATCH_ID, 1)
+          .load()
+
+        val batch1AnsDf = batch1Df.selectExpr(
+          "key.value AS groupingKey",
+          "single_value.value AS valueId")
+
+        checkAnswer(batch1AnsDf, Seq(Row("a", 2L)))
+
+        val batch3Df = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
+          .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+          .option(StateSourceOptions.BATCH_ID, 3)
+          .load()
+
+        val batch3AnsDf = batch3Df.selectExpr(
+          "key.value AS groupingKey",
+          "single_value.value AS valueId")
+        checkAnswer(batch3AnsDf, Seq(Row("a", 1L)))
+      }
+    }
+  }
+
   test("state data source integration - value state supports time travel") {
     withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
       classOf[RocksDBStateStoreProvider].getName,
@@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
       }
     }
   }
+
+  test("transformWithState - verify that no metadata and schema logs are 
purged after" +
+    " removing column family") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") {
+      withTempDir { chkptDir =>
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "2", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "1", "")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "2", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "3", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "4", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "5", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "6", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "7", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "8", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "9", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "10", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "11", "str1")),
+          AddData(inputData, ("b", "str1")),
+          CheckNewAnswer(("b", "12", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("b", "str2")),
+          CheckNewAnswer(("b", "str1")),
+          AddData(inputData, ("b", "str3")),
+          CheckNewAnswer(("b", "str2")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+
+        // Metadata files are written for batches 0, 2, and 14.
+        // Schema files are written for 0, 14
+        // At the beginning of the last query run, the thresholdBatchId is 11.
+        // However, we would need both schema files to be preserved, if we 
want to
+        // be able to read from batch 11 onwards.
+        assert(getFiles(metadataPath).length == 2)
+        assert(getFiles(stateSchemaPath).length == 2)
+      }
+    }
+  }
 }
 
 class TransformWithStateValidationSuite extends StateStoreMetricsTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to