This is an automated email from the ASF dual-hosted git repository. ashrigondekar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e2eb540cb8db [SPARK-53103][SS] Throw an error if state directory is not empty when query starts e2eb540cb8db is described below commit e2eb540cb8dbd1f6a224083688c89f9aba0db2e2 Author: Dylan Wong <dylan.w...@databricks.com> AuthorDate: Fri Aug 22 10:28:49 2025 -0700 [SPARK-53103][SS] Throw an error if state directory is not empty when query starts ### What changes were proposed in this pull request? We are implementing a check to ensure the state directory of the checkpoint location is empty before the first batch of a streaming query begins. The new check will throw an error if the directory isn't empty. ### Why are the changes needed? This will prevent a conflict where background maintenance processes delete files that the new query has just created and prevents sharing state directories. ### Does this PR introduce _any_ user-facing change? Yes. There is a new error condition ```STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY``` that occurs when the state directory is not empty on the first batch of a query. ### How was this patch tested? Unit tests are added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51817 from dylanwong250/SPARK-53103. Lead-authored-by: Dylan Wong <dylan.w...@databricks.com> Co-authored-by: dylanwong250 <dylanwong...@gmail.com> Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> --- .../src/main/resources/error/error-conditions.json | 7 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++ .../streaming/runtime/MicroBatchExecution.scala | 35 ++++++++- .../streaming/state/StateStoreErrors.scala | 12 ++++ .../sql/streaming/StreamingQueryManagerSuite.scala | 4 ++ .../spark/sql/streaming/StreamingQuerySuite.scala | 83 +++++++++++++++++++++- 6 files changed, 148 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 07725ea7e0ee..f7db0b6761a5 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5161,6 +5161,13 @@ ], "sqlState" : "42802" }, + "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY" : { + "message" : [ + "The checkpoint location <checkpointLocation> should be empty on batch 0", + "Please either use a new checkpoint location, or delete the existing data in the checkpoint location." + ], + "sqlState" : "42K03" + }, "STATE_STORE_COLUMN_FAMILY_SCHEMA_INCOMPATIBLE" : { "message" : [ "Incompatible schema transformation with column family=<colFamilyName>, oldSchema=<oldSchema>, newSchema=<newSchema>." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 881f204e1a43..f344c6208a1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2687,6 +2687,16 @@ object SQLConf { .intConf .createWithDefault(16) + val STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START = + buildConf("spark.sql.streaming.verifyCheckpointDirectoryEmptyOnStart") + .internal() + .doc("When true, verifies that the checkpoint directory (offsets, state, commits) is " + + "empty when first starting a streaming query. This prevents prevents sharing checkpoint " + + "directories between different queries.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + val STATE_STORE_COMPRESSION_CODEC = buildConf("spark.sql.streaming.stateStore.compression.codec") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index fc80cbae751f..461936b40218 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -21,6 +21,8 @@ import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable import scala.util.control.NonFatal +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys.BATCH_ID import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -38,11 +40,12 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, Sink, Source} -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitMetadata, OffsetSeq, OffsetSeqMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeq, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler +import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} -import org.apache.spark.sql.execution.streaming.state.StateSchemaBroadcast +import org.apache.spark.sql.execution.streaming.state.{StateSchemaBroadcast, StateStoreErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.{Clock, Utils} @@ -562,12 +565,40 @@ class MicroBatchExecution( log"offsets ${MDC(LogKeys.STREAMING_OFFSETS_START, execCtx.startOffsets)} and " + log"available offsets ${MDC(LogKeys.STREAMING_OFFSETS_END, execCtx.endOffsets)}") case None => // We are starting this stream for the first time. + val shouldVerifyNewCheckpointDirectory = + sparkSession.conf.get(SQLConf.STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START) + if (shouldVerifyNewCheckpointDirectory) { + verifyNewCheckpointDirectory() + } logInfo(s"Starting new streaming query.") execCtx.batchId = 0 watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf, logicalPlan) } } + /** + * Verify that the checkpoint directory is in a good state to start a new + * streaming query. This checks that the offsets, state, commits directories are + * either non-existent or empty. + * + * If this check fails, an exception is thrown. + */ + private def verifyNewCheckpointDirectory(): Unit = { + val fileManager = CheckpointFileManager.create(new Path(resolvedCheckpointRoot), + sparkSession.sessionState.newHadoopConf()) + val dirNamesThatShouldNotHaveFiles = Array[String]( + DIR_NAME_OFFSETS, DIR_NAME_STATE, DIR_NAME_COMMITS) + + dirNamesThatShouldNotHaveFiles.foreach { dirName => + val path = new Path(resolvedCheckpointRoot, dirName) + + if (fileManager.exists(path) && !fileManager.list(path).isEmpty) { + val loc = path.toString + throw StateStoreErrors.streamingStateCheckpointLocationNotEmpty(loc) + } + } + } + /** * Returns true if there is any new data available to be processed. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 455b06f8d9dc..43682de03446 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -187,6 +187,11 @@ object StateStoreErrors { numSchemaFiles, schemaFilesThreshold, addedColFamilies, removedColFamilies) } + def streamingStateCheckpointLocationNotEmpty(checkpointLocation: String) + : StateStoreCheckpointLocationNotEmpty = { + new StateStoreCheckpointLocationNotEmpty(checkpointLocation) + } + def stateStoreColumnFamilyMismatch( columnFamilyName: String, oldColumnFamilySchema: String, @@ -474,6 +479,13 @@ class StateStoreStateSchemaFilesThresholdExceeded( "addedColumnFamilies" -> addedColFamilies.mkString("(", ",", ")"), "removedColumnFamilies" -> removedColFamilies.mkString("(", ",", ")"))) +class StateStoreCheckpointLocationNotEmpty( + checkpointLocation: String) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY", + messageParameters = Map( + "checkpointLocation" -> checkpointLocation)) + class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String) extends SparkRuntimeException( errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index fea218500677..c0a123a2895c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -320,6 +320,8 @@ class StreamingQueryManagerSuite extends StreamTest { val query1 = ds1.writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) ms1.addData(1, 2, 3) + query1.processAllAvailable() // ensure offset log has been written + val query2 = ds2.writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) try { @@ -382,6 +384,8 @@ class StreamingQueryManagerSuite extends StreamTest { val query1 = ms1.toDS().writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) ms1.addData(1, 2, 3) + query1.processAllAvailable() // ensure offset log has been written + val query2 = ds2.writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index df5d49e03ee4..7ea53d41a150 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -45,9 +45,10 @@ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit} import org.apache.spark.sql.execution.exchange.{REQUIRED_BY_STATEFUL_OPERATOR, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, MetricsReporter, StreamExecution, StreamingExecutionRelation, StreamingQueryWrapper} import org.apache.spark.sql.execution.streaming.sources.{MemorySink, TestForeachWriter} +import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStoreCheckpointLocationNotEmpty} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} @@ -1475,6 +1476,86 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + private val TEST_PROVIDERS = Seq( + classOf[HDFSBackedStateStoreProvider].getName, + classOf[RocksDBStateStoreProvider].getName + ) + + TEST_PROVIDERS.foreach { provider => + test("SPARK-53103: non empty state and commits checkpoint directory on first batch" + + s"(with $provider)") { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider) { + + withTempDir { checkpointDir => + val q = MemoryStream[Int].toDS().groupBy().count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start() + // Verify that the query can start successfully when the checkpoint directory is empty. + q.stop() + } + + withTempDir { checkpointDir => + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(new Path(checkpointDir.toString), hadoopConf) + + // Create a non-empty state checkpoint directory to simulate the case that the user + // a directory that already has state data. + fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, "state"), "0")) + + checkError( + exception = intercept[StreamingQueryException] { + MemoryStream[Int].toDS().groupBy().count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start() + .processAllAvailable() + }.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty], + condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY", + sqlState = "42K03", + parameters = Map( + "checkpointLocation" -> + ("file:" + (new Path(checkpointDir.getCanonicalPath, "state")).toString) + )) + } + + withTempDir { checkpointDir => + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(new Path(checkpointDir.toString), hadoopConf) + + // Create a non-empty state checkpoint directory to simulate the case that the user + // a directory that already has commits data. + fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, "commits"), "0")) + + checkError( + exception = intercept[StreamingQueryException] { + MemoryStream[Int].toDS().groupBy().count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start() + .processAllAvailable() + }.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty], + condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY", + sqlState = "42K03", + parameters = Map( + "checkpointLocation" -> + ("file:" + (new Path(checkpointDir.getCanonicalPath, "commits")).toString) + )) + } + } + } + } + private def checkAppendOutputModeException(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org