This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8023504e69fd [SPARK-49594][SS] Adding check on whether columnFamilies
were added or removed to write StateSchemaV3 file
8023504e69fd is described below
commit 8023504e69fdd037dea002e961b960fd9fa662ba
Author: Eric Marnadi <[email protected]>
AuthorDate: Thu Sep 12 12:01:08 2024 +0900
[SPARK-49594][SS] Adding check on whether columnFamilies were added or
removed to write StateSchemaV3 file
### What changes were proposed in this pull request?
Up until this [PR](https://github.com/apache/spark/pull/47880) that enabled
deleteIfExists, we changed the condition on which we throw an error. However,
in doing so, we are not writing schema files whenever we add or remove column
families, which is functionally incorrect.
Additionally, we were initially always writing the newSchemaFilePath to the
OperatorStateMetadata upon every new query run, when we should only do this if
the schema changes.
### Why are the changes needed?
These changes are needed because we want to write a schema file out every
time we add or remove column families. Also, we want to make sure that we point
to the old schema file for the current metadata file if the schema has not
changed between this run and the last one, as opposed to populating the
metadata with a new schema file path every time, even if this file is not
created.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Amended unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48067 from ericm-db/add-remove-cf.
Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../state/StateSchemaCompatibilityChecker.scala | 40 +++-
.../sql/streaming/TransformWithStateSuite.scala | 219 ++++++++++++++++++++-
2 files changed, 250 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 90eb634689b2..3a1793f71794 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager,
StatefulOperatorStateInfo}
import
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader,
SchemaWriter}
+import
org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{DataType, StructType}
@@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker(
stateStoreColFamilySchema: List[StateStoreColFamilySchema],
stateSchemaVersion: Int): Unit = {
// Ensure that schema file path is passed explicitly for schema version 3
- if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) {
+ if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) {
throw new IllegalStateException("Schema file path is required for schema
version 3")
}
@@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker(
check(existingStateSchema, newSchema, ignoreValueSchema)
}
}
+ val colFamiliesAddedOrRemoved =
+ newStateSchemaList.map(_.colFamilyName) !=
existingStateSchemaList.map(_.colFamilyName)
+ if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved)
{
+ createSchemaFile(newStateSchemaList, stateSchemaVersion)
+ }
// TODO: [SPARK-49535] Write Schema files after schema has changed for
StateSchemaV3
- false
+ colFamiliesAddedOrRemoved
}
}
@@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker(
}
object StateSchemaCompatibilityChecker {
+
+ val SCHEMA_FORMAT_V3: Int = 3
+
private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
if (!UnsafeRowUtils.isBinaryStable(schema)) {
throw new SparkUnsupportedOperationException(
@@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker {
if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
throw result.get
}
- val schemaFileLocation = newSchemaFilePath match {
- case Some(path) => path.toString
- case None => checker.schemaFileLocation.toString
+ val schemaFileLocation = if (evolvedSchema) {
+ // if we are using the state schema v3, and we have
+ // evolved schema, this newSchemaFilePath should be defined
+ // and we want to populate the metadata with this file
+ if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
+ newSchemaFilePath.get.toString
+ } else {
+ // if we are using any version less than v3, we have written
+ // the schema to this static location, which we will return
+ checker.schemaFileLocation.toString
+ }
+ } else {
+ // if we have not evolved schema (there has been a previous schema)
+ // and we are using state schema v3, this file path would be defined
+ // so we would just populate the next run's metadata file with this
+ // file path
+ if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
+ oldSchemaFilePath.get.toString
+ } else {
+ // if we are using any version less than v3, we have written
+ // the schema to this static location, which we will return
+ checker.schemaFileLocation.toString
+ }
}
+
StateSchemaValidationResult(evolvedSchema, schemaFileLocation)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index a17f3847323d..d0e255bb3049 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withTempDir { chkptDir =>
+ val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
+ val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+ val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
// in this test case, we are changing the state spec back and forth
// to trigger the writing of the schema and metadata files
val inputData = MemoryStream[(String, String)]
@@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
},
StopStream
)
+ // assert that a metadata and schema file has been written for each run
+ // as state variables have been deleted
+ assert(getFiles(metadataPath).length == 2)
+ assert(getFiles(stateSchemaPath).length == 2)
+
val result3 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
@@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
},
StopStream
)
- val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
- val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
-
- val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
// by the end of the test, there have been 4 batches,
// so the metadata and schema logs, and commitLog has been purged
// for batches 0 and 1 so metadata and schema files exist for batches
0, 1, 2, 3
@@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
}
}
+ test("transformWithState - verify that schema file is kept after metadata is
purged") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+ withTempDir { chkptDir =>
+ val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
+ val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+ val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+ // in this test case, we are changing the state spec back and forth
+ // to trigger the writing of the schema and metadata files
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "2", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str2")),
+ CheckNewAnswer(("a", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ assert(getFiles(metadataPath).length == 3)
+ assert(getFiles(stateSchemaPath).length == 2)
+
+ val result3 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result3, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str3")),
+ CheckNewAnswer(("a", "1", "str2")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ // metadata files should be kept for batches 1, 2, 3
+ // schema files should be kept for batches 0, 2, 3
+ assert(getFiles(metadataPath).length == 3)
+ assert(getFiles(stateSchemaPath).length == 3)
+ // we want to ensure that we can read batch 1 even though the
+ // metadata file for batch 0 was removed
+ val batch1Df = spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
+ .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+ .option(StateSourceOptions.BATCH_ID, 1)
+ .load()
+
+ val batch1AnsDf = batch1Df.selectExpr(
+ "key.value AS groupingKey",
+ "single_value.value AS valueId")
+
+ checkAnswer(batch1AnsDf, Seq(Row("a", 2L)))
+
+ val batch3Df = spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
+ .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+ .option(StateSourceOptions.BATCH_ID, 3)
+ .load()
+
+ val batch3AnsDf = batch3Df.selectExpr(
+ "key.value AS groupingKey",
+ "single_value.value AS valueId")
+ checkAnswer(batch3AnsDf, Seq(Row("a", 1L)))
+ }
+ }
+ }
+
test("state data source integration - value state supports time travel") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
@@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
}
}
}
+
+ test("transformWithState - verify that no metadata and schema logs are
purged after" +
+ " removing column family") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") {
+ withTempDir { chkptDir =>
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "2", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "1", "")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "2", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "3", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "4", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "5", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "6", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "7", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "8", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "9", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "10", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "11", "str1")),
+ AddData(inputData, ("b", "str1")),
+ CheckNewAnswer(("b", "12", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("b", "str2")),
+ CheckNewAnswer(("b", "str1")),
+ AddData(inputData, ("b", "str3")),
+ CheckNewAnswer(("b", "str2")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+
+ val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
+ val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+ val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+
+ // Metadata files are written for batches 0, 2, and 14.
+ // Schema files are written for 0, 14
+ // At the beginning of the last query run, the thresholdBatchId is 11.
+ // However, we would need both schema files to be preserved, if we
want to
+ // be able to read from batch 11 onwards.
+ assert(getFiles(metadataPath).length == 2)
+ assert(getFiles(stateSchemaPath).length == 2)
+ }
+ }
+ }
}
class TransformWithStateValidationSuite extends StateStoreMetricsTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]