This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new ac168abb7c3b [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition ac168abb7c3b is described below commit ac168abb7c3ba7d95098f277ea9ab961691e43e6 Author: Livia Zhu <livia....@databricks.com> AuthorDate: Thu Mar 27 12:24:39 2025 +0900 [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition Return StateStoreCheckpointInfo as part of RocksDB.commit() and store it locally in the RocksDBStateStore so that RocksDBStateStore.getCheckpointInfo() always returns the checkpoint info belonging to its commit. Fixes the bug explained in SPARK-51573. This race condition will result in tasks getting incorrect checkpointInfo which is a correctness bug. No. Added new unit test. No. Closes #50344 from liviazhu-db/liviazhu-db/checkpointinfo-race. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 4765c15a442ee92022b28398ba52136b8e05082d) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-conditions.json | 6 +++++ .../sql/execution/streaming/state/RocksDB.scala | 14 +++++------ .../state/RocksDBStateStoreProvider.scala | 13 +++++++--- .../streaming/state/StateStoreErrors.scala | 10 ++++++++ .../RocksDBStateStoreCheckpointFormatV2Suite.scala | 29 ++++++++++++++++++++++ .../execution/streaming/state/RocksDBSuite.scala | 8 +++--- 6 files changed, 65 insertions(+), 15 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index cfb218dfb3be..47c1151d3c86 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4781,6 +4781,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_OPERATION_OUT_OF_ORDER" : { + "message" : [ + "Streaming stateful operator attempted to access state store out of order. This is a bug, please retry. error_msg=<errorMsg>" + ], + "sqlState" : "XXKST" + }, "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : { "message" : [ "The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 5ff6ae6551ff..72ff4c480667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -60,11 +60,10 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple * * @note This class is not thread-safe, so use it only from one thread. * @see [[RocksDBFileManager]] to see how the files are laid out in local disk and DFS. - * @param dfsRootDir Remote directory where checkpoints are going to be written * @param conf Configuration for RocksDB + * @param stateStoreId StateStoreId for the state store * @param localRootDir Root directory in local disk that is used to working and checkpointing dirs * @param hadoopConf Hadoop configuration for talking to the remote file system - * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs */ class RocksDB( dfsRootDir: String, @@ -73,7 +72,8 @@ class RocksDB( hadoopConf: Configuration = new Configuration, loggingId: String = "", useColumnFamilies: Boolean = false, - enableStateStoreCheckpointIds: Boolean = false) extends Logging { + enableStateStoreCheckpointIds: Boolean = false, + partitionId: Int = 0) extends Logging { import RocksDB._ @@ -987,7 +987,7 @@ class RocksDB( * - Create a RocksDB checkpoint in a new local dir * - Sync the checkpoint dir files to DFS */ - def commit(): Long = { + def commit(): (Long, StateStoreCheckpointInfo) = { val newVersion = loadedVersion + 1 try { logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}") @@ -1056,7 +1056,7 @@ class RocksDB( recordedMetrics = Some(metrics) logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " + log"stats = ${MDC(LogKeys.METRICS_JSON, recordedMetrics.get.json)}") - loadedVersion + (loadedVersion, getLatestCheckpointInfo) } catch { case t: Throwable => loadedVersion = -1 // invalidate loaded version @@ -1224,11 +1224,11 @@ class RocksDB( def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = (writeBufferManager, lruCache) /** - * Called by RocksDBStateStoreProvider to retrieve the checkpoint information to be + * Called by commit() to retrieve the checkpoint information to be * passed back to the stateful operator. It will return the information for the latest * state store checkpointing. */ - def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = { + private def getLatestCheckpointInfo: StateStoreCheckpointInfo = { StateStoreCheckpointInfo( partitionId, loadedVersion, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index cd9fdb9469d6..735e8d567b87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -230,10 +230,12 @@ private[sql] class RocksDBStateStoreProvider } } + var checkpointInfo: Option[StateStoreCheckpointInfo] = None override def commit(): Long = synchronized { try { verify(state == UPDATING, "Cannot commit after already committed or aborted") - val newVersion = rocksDB.commit() + val (newVersion, newCheckpointInfo) = rocksDB.commit() + checkpointInfo = Some(newCheckpointInfo) state = COMMITTED logInfo(log"Committed ${MDC(VERSION_NUM, newVersion)} " + log"for ${MDC(STATE_STORE_ID, id)}") @@ -328,8 +330,11 @@ private[sql] class RocksDBStateStoreProvider } override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = { - val checkpointInfo = rocksDB.getLatestCheckpointInfo(id.partitionId) - checkpointInfo + checkpointInfo match { + case Some(info) => info + case None => throw StateStoreErrors.stateStoreOperationOutOfOrder( + "Cannot get checkpointInfo without committing the store") + } } override def hasCommitted: Boolean = state == COMMITTED @@ -517,7 +522,7 @@ private[sql] class RocksDBStateStoreProvider val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr) new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr, - useColumnFamilies, storeConf.enableStateStoreCheckpointIds) + useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId) } private val keyValueEncoderMap = new java.util.concurrent.ConcurrentHashMap[String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 188306e82f68..1c34da65ebb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -212,6 +212,10 @@ object StateStoreErrors { StateStoreInvalidVariableTypeChange = { new StateStoreInvalidVariableTypeChange(stateName, oldType, newType) } + + def stateStoreOperationOutOfOrder(errorMsg: String): StateStoreOperationOutOfOrder = { + new StateStoreOperationOutOfOrder(errorMsg) + } } class StateStoreDuplicateStateVariableDefined(stateVarName: String) @@ -424,3 +428,9 @@ class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) extends SparkUnsupportedOperationException( errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", messageParameters = Map("inputClass" -> inputClass)) + +class StateStoreOperationOutOfOrder(errorMsg: String) + extends SparkRuntimeException( + errorClass = "STATE_STORE_OPERATION_OUT_OF_ORDER", + messageParameters = Map("errorMsg" -> errorMsg) + ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index d35bbd49de0d..59f3997f8533 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -27,6 +27,7 @@ import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} +import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ @@ -1109,4 +1110,32 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest ) } } + + test("checkpointFormatVersion2 racing commits don't return incorrect checkpointInfo") { + val sqlConf = new SQLConf() + sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2) + + withTempDir { checkpointDir => + val provider = new CkptIdCollectingStateStoreProviderWrapper() + provider.init( + StateStoreId(checkpointDir.toString, 0, 0), + StateStoreTestsHelper.keySchema, + StateStoreTestsHelper.valueSchema, + PrefixKeyScanStateEncoderSpec(StateStoreTestsHelper.keySchema, 1), + useColumnFamilies = false, + new StateStoreConf(sqlConf), + new Configuration + ) + + val store1 = provider.getStore(0) + val store1NewVersion = store1.commit() + val store2 = provider.getStore(1) + val store2NewVersion = store2.commit() + val store1CheckpointInfo = store1.getStateStoreCheckpointInfo() + val store2CheckpointInfo = store2.getStateStoreCheckpointInfo() + + assert(store1CheckpointInfo.batchVersion == store1NewVersion) + assert(store2CheckpointInfo.batchVersion == store2NewVersion) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 475f8b116830..9a79f3fa0ae8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2656,7 +2656,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("a", "5") db.put("b", "5") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === 2) assert(db.metricsOpt.get.numCommittedKeys === 2) @@ -2672,7 +2672,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("b", "7") db.put("c", "7") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === -1) assert(db.metricsOpt.get.numCommittedKeys === -1) @@ -2688,7 +2688,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("c", "8") db.put("d", "8") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === 4) assert(db.metricsOpt.get.numCommittedKeys === 4) @@ -3523,7 +3523,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } - override def commit(): Long = { + override def commit(): (Long, StateStoreCheckpointInfo) = { val ret = super.commit() // update versionToUniqueId from lineageManager lineageManager.getLineageForCurrVersion().foreach { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org