This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e2eb540cb8db [SPARK-53103][SS] Throw an error if state directory is 
not empty when query starts
e2eb540cb8db is described below

commit e2eb540cb8dbd1f6a224083688c89f9aba0db2e2
Author: Dylan Wong <dylan.w...@databricks.com>
AuthorDate: Fri Aug 22 10:28:49 2025 -0700

    [SPARK-53103][SS] Throw an error if state directory is not empty when query 
starts
    
    ### What changes were proposed in this pull request?
    
    We are implementing a check to ensure the state directory of the checkpoint 
location is empty before the first batch of a streaming query begins. The new 
check will throw an error if the directory isn't empty.
    
    ### Why are the changes needed?
    
    This will prevent a conflict where background maintenance processes delete 
files that the new query has just created and prevents sharing state 
directories.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. There is a new error condition 
```STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY``` that occurs when the state 
directory is not empty on the first batch of a query.
    
    ### How was this patch tested?
    
    Unit tests are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51817 from dylanwong250/SPARK-53103.
    
    Lead-authored-by: Dylan Wong <dylan.w...@databricks.com>
    Co-authored-by: dylanwong250 <dylanwong...@gmail.com>
    Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
---
 .../src/main/resources/error/error-conditions.json |  7 ++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 +++
 .../streaming/runtime/MicroBatchExecution.scala    | 35 ++++++++-
 .../streaming/state/StateStoreErrors.scala         | 12 ++++
 .../sql/streaming/StreamingQueryManagerSuite.scala |  4 ++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 83 +++++++++++++++++++++-
 6 files changed, 148 insertions(+), 3 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 07725ea7e0ee..f7db0b6761a5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5161,6 +5161,13 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY" : {
+    "message" : [
+      "The checkpoint location <checkpointLocation> should be empty on batch 
0",
+      "Please either use a new checkpoint location, or delete the existing 
data in the checkpoint location."
+    ],
+    "sqlState" : "42K03"
+  },
   "STATE_STORE_COLUMN_FAMILY_SCHEMA_INCOMPATIBLE" : {
     "message" : [
       "Incompatible schema transformation with column family=<colFamilyName>, 
oldSchema=<oldSchema>, newSchema=<newSchema>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 881f204e1a43..f344c6208a1e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2687,6 +2687,16 @@ object SQLConf {
       .intConf
       .createWithDefault(16)
 
+  val STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START =
+    buildConf("spark.sql.streaming.verifyCheckpointDirectoryEmptyOnStart")
+      .internal()
+      .doc("When true, verifies that the checkpoint directory (offsets, state, 
commits) is " +
+        "empty when first starting a streaming query. This prevents prevents 
sharing checkpoint " +
+        "directories between different queries.")
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val STATE_STORE_COMPRESSION_CODEC =
     buildConf("spark.sql.streaming.stateStore.compression.codec")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index fc80cbae751f..461936b40218 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -21,6 +21,8 @@ import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.LogKeys
 import org.apache.spark.internal.LogKeys.BATCH_ID
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -38,11 +40,12 @@ import org.apache.spark.sql.execution.{SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, 
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, 
OneTimeTrigger, ProcessingTimeTrigger, Sink, Source}
-import org.apache.spark.sql.execution.streaming.checkpointing.{CommitMetadata, 
OffsetSeq, OffsetSeqMetadata}
+import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
CommitMetadata, OffsetSeq, OffsetSeqMetadata}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
 StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
 import 
org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
+import 
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS, DIR_NAME_STATE}
 import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, 
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
-import org.apache.spark.sql.execution.streaming.state.StateSchemaBroadcast
+import org.apache.spark.sql.execution.streaming.state.{StateSchemaBroadcast, 
StateStoreErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.util.{Clock, Utils}
@@ -562,12 +565,40 @@ class MicroBatchExecution(
           log"offsets ${MDC(LogKeys.STREAMING_OFFSETS_START, 
execCtx.startOffsets)} and " +
           log"available offsets ${MDC(LogKeys.STREAMING_OFFSETS_END, 
execCtx.endOffsets)}")
       case None => // We are starting this stream for the first time.
+        val shouldVerifyNewCheckpointDirectory =
+          
sparkSession.conf.get(SQLConf.STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START)
+        if (shouldVerifyNewCheckpointDirectory) {
+          verifyNewCheckpointDirectory()
+        }
         logInfo(s"Starting new streaming query.")
         execCtx.batchId = 0
         watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf, 
logicalPlan)
     }
   }
 
+  /**
+   * Verify that the checkpoint directory is in a good state to start a new
+   * streaming query. This checks that the offsets, state, commits directories 
are
+   * either non-existent or empty.
+   *
+   * If this check fails, an exception is thrown.
+   */
+  private def verifyNewCheckpointDirectory(): Unit = {
+    val fileManager = CheckpointFileManager.create(new 
Path(resolvedCheckpointRoot),
+      sparkSession.sessionState.newHadoopConf())
+    val dirNamesThatShouldNotHaveFiles = Array[String](
+      DIR_NAME_OFFSETS, DIR_NAME_STATE, DIR_NAME_COMMITS)
+
+    dirNamesThatShouldNotHaveFiles.foreach { dirName =>
+      val path = new Path(resolvedCheckpointRoot, dirName)
+
+      if (fileManager.exists(path) && !fileManager.list(path).isEmpty) {
+        val loc = path.toString
+        throw StateStoreErrors.streamingStateCheckpointLocationNotEmpty(loc)
+      }
+    }
+  }
+
   /**
    * Returns true if there is any new data available to be processed.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 455b06f8d9dc..43682de03446 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -187,6 +187,11 @@ object StateStoreErrors {
       numSchemaFiles, schemaFilesThreshold, addedColFamilies, 
removedColFamilies)
   }
 
+  def streamingStateCheckpointLocationNotEmpty(checkpointLocation: String)
+    : StateStoreCheckpointLocationNotEmpty = {
+    new StateStoreCheckpointLocationNotEmpty(checkpointLocation)
+  }
+
   def stateStoreColumnFamilyMismatch(
       columnFamilyName: String,
       oldColumnFamilySchema: String,
@@ -474,6 +479,13 @@ class StateStoreStateSchemaFilesThresholdExceeded(
       "addedColumnFamilies" -> addedColFamilies.mkString("(", ",", ")"),
       "removedColumnFamilies" -> removedColFamilies.mkString("(", ",", ")")))
 
+class StateStoreCheckpointLocationNotEmpty(
+    checkpointLocation: String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
+    messageParameters = Map(
+      "checkpointLocation" -> checkpointLocation))
+
 class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
   extends SparkRuntimeException(
     errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index fea218500677..c0a123a2895c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -320,6 +320,8 @@ class StreamingQueryManagerSuite extends StreamTest {
           val query1 = ds1.writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
           ms1.addData(1, 2, 3)
+          query1.processAllAvailable() // ensure offset log has been written
+
           val query2 = ds2.writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
           try {
@@ -382,6 +384,8 @@ class StreamingQueryManagerSuite extends StreamTest {
           val query1 = ms1.toDS().writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
           ms1.addData(1, 2, 3)
+          query1.processAllAvailable() // ensure offset log has been written
+
           val query2 = ds2.writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
           try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index df5d49e03ee4..7ea53d41a150 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -45,9 +45,10 @@ import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
ReadLimit}
 import org.apache.spark.sql.execution.exchange.{REQUIRED_BY_STATEFUL_OPERATOR, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
+import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
OffsetSeqMetadata}
 import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, 
MemoryStream, MetricsReporter, StreamExecution, StreamingExecutionRelation, 
StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.sources.{MemorySink, 
TestForeachWriter}
+import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStoreCheckpointLocationNotEmpty}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.util.{BlockingSource, 
MockSourceProvider, StreamManualClock}
@@ -1475,6 +1476,86 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  private val TEST_PROVIDERS = Seq(
+    classOf[HDFSBackedStateStoreProvider].getName,
+    classOf[RocksDBStateStoreProvider].getName
+  )
+
+  TEST_PROVIDERS.foreach { provider =>
+    test("SPARK-53103: non empty state and commits checkpoint directory on 
first batch"
+      + s"(with $provider)") {
+      withSQLConf(
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider) {
+
+        withTempDir { checkpointDir =>
+          val q = MemoryStream[Int].toDS().groupBy().count()
+            .writeStream
+            .format("memory")
+            .outputMode("complete")
+            .queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .start()
+          // Verify that the query can start successfully when the checkpoint 
directory is empty.
+          q.stop()
+        }
+
+        withTempDir { checkpointDir =>
+          val hadoopConf = spark.sessionState.newHadoopConf()
+          val fm = CheckpointFileManager.create(new 
Path(checkpointDir.toString), hadoopConf)
+
+          // Create a non-empty state checkpoint directory to simulate the 
case that the user
+          // a directory that already has state data.
+          fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, 
"state"), "0"))
+
+          checkError(
+            exception = intercept[StreamingQueryException] {
+              MemoryStream[Int].toDS().groupBy().count()
+                .writeStream
+                .format("memory")
+                .outputMode("complete")
+                
.queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
+                .option("checkpointLocation", checkpointDir.getCanonicalPath)
+                .start()
+                .processAllAvailable()
+            }.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty],
+            condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
+            sqlState = "42K03",
+            parameters = Map(
+              "checkpointLocation" ->
+                ("file:" + (new Path(checkpointDir.getCanonicalPath, 
"state")).toString)
+            ))
+        }
+
+        withTempDir { checkpointDir =>
+          val hadoopConf = spark.sessionState.newHadoopConf()
+          val fm = CheckpointFileManager.create(new 
Path(checkpointDir.toString), hadoopConf)
+
+          // Create a non-empty state checkpoint directory to simulate the 
case that the user
+          // a directory that already has commits data.
+          fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, 
"commits"), "0"))
+
+          checkError(
+            exception = intercept[StreamingQueryException] {
+              MemoryStream[Int].toDS().groupBy().count()
+                .writeStream
+                .format("memory")
+                .outputMode("complete")
+                
.queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
+                .option("checkpointLocation", checkpointDir.getCanonicalPath)
+                .start()
+                .processAllAvailable()
+            }.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty],
+            condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
+            sqlState = "42K03",
+            parameters = Map(
+              "checkpointLocation" ->
+                ("file:" + (new Path(checkpointDir.getCanonicalPath, 
"commits")).toString)
+            ))
+        }
+      }
+    }
+  }
+
   private def checkAppendOutputModeException(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to