This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 41b34d6be494 [SPARK-55493][SS] Do not mkdirs in streaming checkpoint 
state directory in StateDataSource
41b34d6be494 is described below

commit 41b34d6be494168ef4dfe40091001c0ba28bdf44
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Feb 20 15:51:26 2026 -0800

    [SPARK-55493][SS] Do not mkdirs in streaming checkpoint state directory in 
StateDataSource
    
    ### What changes were proposed in this pull request?
    
    Previously, we try to create a new directory for the state directory in the 
checkpoint directory if it doesn't exist when running `StateDataSource`. This 
change creates new readOnly mode so that datasources do not need to mkdirs.
    
    ### Why are the changes needed?
    
    Allow usage of StateDataSource on checkpoints that are read-only.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: claude opus 4.6
    
    Closes #54277 from liviazhu/liviazhu-db/stds-fix.
    
    Authored-by: Livia Zhu <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../datasources/v2/state/StateDataSource.scala     |   2 +-
 .../v2/state/StreamStreamJoinStateHelper.scala     |   7 +-
 .../v2/state/metadata/StateMetadataSource.scala    |   6 +-
 .../streaming/state/OperatorStateMetadata.scala    |  19 ++-
 .../state/StateSchemaCompatibilityChecker.scala    |  14 +-
 .../v2/state/StateDataSourceReadSuite.scala        | 157 +++++++++++++++++++++
 .../v2/state/StateDataSourceTestBase.scala         |  27 +++-
 7 files changed, 216 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 9ccbb9a649f2..07b756006525 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -379,7 +379,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
           partitionId, sourceOptions.storeName)
         val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
         val manager = new StateSchemaCompatibilityChecker(providerId, 
hadoopConf,
-          oldSchemaFilePaths = oldSchemaFilePaths)
+          oldSchemaFilePaths = oldSchemaFilePaths, createSchemaDir = false)
         val stateSchema = manager.readSchemaFile()
 
         if (sourceOptions.internalOnlyReadAllColumnFamilies) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
index 5cb38022159c..78648a01a6f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
@@ -94,12 +94,13 @@ object StreamStreamJoinStateHelper {
 
         // read the key schema from the keyToNumValues store for the join keys
         val manager = new StateSchemaCompatibilityChecker(
-          providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths)
+          providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths,
+          createSchemaDir = false)
         val kSchema = manager.readSchemaFile().head.keySchema
 
         // read the value schema from the keyWithIndexToValue store for the 
values
         val manager2 = new 
StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
-          newHadoopConf, oldSchemaFilePaths)
+          newHadoopConf, oldSchemaFilePaths, createSchemaDir = false)
         val vSchema = manager2.readSchemaFile().head.valueSchema
 
         (kSchema, vSchema)
@@ -109,7 +110,7 @@ object StreamStreamJoinStateHelper {
         val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
 
         val manager = new StateSchemaCompatibilityChecker(
-          providerId, newHadoopConf, oldSchemaFilePaths)
+          providerId, newHadoopConf, oldSchemaFilePaths, createSchemaDir = 
false)
         val kSchema = manager.readSchemaFile().find { schema =>
           schema.colFamilyName == storeNames(0)
         }.map(_.keySchema).get
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
index 31e6ac30a598..cef3a7a7e5cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.state.metadata
 
+import java.io.FileNotFoundException
 import java.util
 
 import scala.jdk.CollectionConverters._
@@ -222,7 +223,8 @@ class StateMetadataPartitionReader(
           1
         }
         OperatorStateMetadataReader.createReader(
-          operatorIdPath, hadoopConf, operatorStateMetadataVersion, 
batchId).read() match {
+          operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId,
+          createMetadataDir = false).read() match {
           case Some(metadata) => metadata
           case None => throw 
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
             batchId)
@@ -231,7 +233,7 @@ class StateMetadataPartitionReader(
     } catch {
       // if the operator metadata is not present, catch the exception
       // and return an empty array
-      case ex: Exception =>
+      case ex: FileNotFoundException =>
         logWarning(log"Failed to find operator metadata for " +
           log"path=${MDC(LogKeys.CHECKPOINT_LOCATION, checkpointLocation)} " +
           log"with exception=${MDC(LogKeys.EXCEPTION, ex)}")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index 6b2295da03b9..ac0f42c34007 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -199,12 +199,14 @@ object OperatorStateMetadataReader {
       stateCheckpointPath: Path,
       hadoopConf: Configuration,
       version: Int,
-      batchId: Long): OperatorStateMetadataReader = {
+      batchId: Long,
+      createMetadataDir: Boolean = true): OperatorStateMetadataReader = {
     version match {
       case 1 =>
         new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
       case 2 =>
-        new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf, 
batchId)
+        new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf, 
batchId,
+          createMetadataDir)
       case _ =>
         throw new IllegalArgumentException(s"Failed to create reader for 
operator metadata " +
           s"with version=$version")
@@ -319,7 +321,8 @@ class OperatorStateMetadataV2Writer(
 class OperatorStateMetadataV2Reader(
     stateCheckpointPath: Path,
     hadoopConf: Configuration,
-    batchId: Long) extends OperatorStateMetadataReader {
+    batchId: Long,
+    createMetadataDir: Boolean = true) extends OperatorStateMetadataReader 
with Logging {
 
   // Check that the requested batchId is available in the checkpoint directory
   val baseCheckpointDir = stateCheckpointPath.getParent.getParent
@@ -331,7 +334,12 @@ class OperatorStateMetadataV2Reader(
   private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
   private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
 
-  fm.mkdirs(metadataDirPath.getParent)
+  if (createMetadataDir && !fm.exists(metadataDirPath.getParent)) {
+    fm.mkdirs(metadataDirPath.getParent)
+  } else if (!createMetadataDir) {
+    logInfo(log"Skipping metadata directory creation (createMetadataDir=false) 
" +
+      log"at ${MDC(LogKeys.CHECKPOINT_LOCATION, baseCheckpointDir.toString)}")
+  }
 
   override def version: Int = 2
 
@@ -352,7 +360,8 @@ class OperatorStateMetadataV2Reader(
 
   // List the available batches in the operator metadata directory
   private def listOperatorMetadataBatches(): Array[Long] = {
-    if (!fm.exists(metadataDirPath)) {
+    // If parent doesn't exist, return empty array rather than throwing an 
exception
+    if (!fm.exists(metadataDirPath.getParent) || !fm.exists(metadataDirPath)) {
       return Array.empty
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index ca18ce9067b3..166ec450bfbd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -81,7 +81,8 @@ class StateSchemaCompatibilityChecker(
     providerId: StateStoreProviderId,
     hadoopConf: Configuration,
     oldSchemaFilePaths: List[Path] = List.empty,
-    newSchemaFilePath: Option[Path] = None) extends Logging {
+    newSchemaFilePath: Option[Path] = None,
+    createSchemaDir: Boolean = true) extends Logging {
 
   // For OperatorStateMetadataV1: Only one schema file present per operator
   // per query
@@ -96,7 +97,12 @@ class StateSchemaCompatibilityChecker(
 
   private val fm = CheckpointFileManager.create(schemaFileLocation, hadoopConf)
 
-  fm.mkdirs(schemaFileLocation.getParent)
+  if (createSchemaDir && !fm.exists(schemaFileLocation.getParent)) {
+    fm.mkdirs(schemaFileLocation.getParent)
+  } else if (!createSchemaDir) {
+    logInfo(log"Skipping schema directory creation (createSchemaDir=false) " +
+      log"at ${MDC(LogKeys.CHECKPOINT_LOCATION, 
schemaFileLocation.getParent.toString)}")
+  }
 
   private val conf = 
SparkSession.getActiveSession.map(_.sessionState.conf).getOrElse(new SQLConf())
 
@@ -112,7 +118,7 @@ class StateSchemaCompatibilityChecker(
   def readSchemaFiles(): Map[String, List[StateStoreColFamilySchema]] = {
     val stateSchemaFilePaths = (oldSchemaFilePaths ++ 
List(schemaFileLocation)).distinct
     stateSchemaFilePaths.flatMap { schemaFile =>
-        if (fm.exists(schemaFile)) {
+        if (fm.exists(schemaFile.getParent) && fm.exists(schemaFile)) {
           val inStream = fm.open(schemaFile)
           StateSchemaCompatibilityChecker.readSchemaFile(inStream)
         } else {
@@ -163,7 +169,7 @@ class StateSchemaCompatibilityChecker(
    *         otherwise
    */
   private def getExistingKeyAndValueSchema(): List[StateStoreColFamilySchema] 
= {
-    if (fm.exists(schemaFileLocation)) {
+    if (fm.exists(schemaFileLocation.getParent) && 
fm.exists(schemaFileLocation)) {
       readSchemaFile()
     } else {
       List.empty
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 526d39478b91..ce29c87bc76e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{OutputMode, TimeMode, 
TransformWithStateSuiteUtils}
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
 
 class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
   import testImplicits._
@@ -1501,3 +1502,159 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
     }
   }
 }
+
+/**
+ * Test suite that verifies the state data source reader does not create empty 
state
+ * directories when reading state for all stateful operators.
+ */
+class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase {
+
+  /**
+   * Asserts that the cause chain of the given exception contains
+   * an instance of the expected type.
+   */
+  private def assertCauseChainContains(
+      e: Throwable,
+      expectedType: Class[_ <: Throwable]): Unit = {
+    var current: Throwable = e
+    while (current != null) {
+      if (expectedType.isInstance(current)) return
+      current = current.getCause
+    }
+    fail(
+      s"Expected ${expectedType.getSimpleName} in cause chain, " +
+        s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}")
+  }
+
+  /**
+   * Runs a stateful query to create the checkpoint structure, deletes the 
state directory,
+   * then attempts to read via the state data source and verifies that the 
state directory
+   * is not recreated.
+   *
+   * @param runQuery function that runs one batch of a stateful query given a 
checkpoint path
+   * @param readState function that attempts to read state given a checkpoint 
path
+   * @param expectedCause the exception type expected in the cause chain
+   */
+  private def assertStateDirectoryNotRecreatedOnRead(
+      runQuery: String => Unit,
+      readState: String => Unit,
+      expectedCause: Class[_ <: Throwable] =
+        classOf[StateDataSourceReadStateSchemaFailure]): Unit = {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+
+      // Step 1: Run the stateful query to create the full checkpoint structure
+      runQuery(checkpointPath)
+
+      // Step 2: Delete the state directory
+      val stateDir = new File(tempDir, "state")
+      assert(stateDir.exists(), "State directory should exist after running 
the query")
+      Utils.deleteRecursively(stateDir)
+      assert(!stateDir.exists(), "State directory should be deleted")
+
+      // Step 3: Attempt to read state - expected to fail since state is 
deleted
+      val e = intercept[Exception] {
+        readState(checkpointPath)
+      }
+      assertCauseChainContains(e, expectedCause)
+
+      // Step 4: Verify the state directory was NOT recreated by the reader
+      assert(!stateDir.exists(),
+        "State data source reader should not recreate the deleted state 
directory")
+    }
+  }
+
+  test("streaming aggregation: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runLargeDataStreamingAggregationQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+    )
+  }
+
+  test("drop duplicates: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runDropDuplicatesQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+    )
+  }
+
+  test("flatMapGroupsWithState: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runFlatMapGroupsWithStateQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+    )
+  }
+
+  test("stream-stream join: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runStreamStreamJoinQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .option(StateSourceOptions.JOIN_SIDE, "left")
+          .load()
+          .collect()
+      }
+    )
+  }
+
+  test("transformWithState: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runTransformWithStateQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+          .load()
+          .collect()
+      },
+      expectedCause = classOf[IllegalArgumentException]
+    )
+  }
+
+  test("session window aggregation: no empty state dir created on read") {
+    assertStateDirectoryNotRecreatedOnRead(
+      runQuery = checkpointPath => {
+        runSessionWindowAggregationQuery(checkpointPath)
+      },
+      readState = checkpointPath => {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+    )
+  }
+
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
index 98ecdde2e571..aed484470e47 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{DataFrame, Dataset}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
RocksDBStateStoreProvider, StateStore}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
@@ -445,6 +445,31 @@ trait StateDataSourceTestBase extends StreamTest with 
StateStoreMetricsTest {
     )
   }
 
+  /**
+   * Runs one batch of a transformWithState query (using 
RunningCountStatefulProcessor)
+   * to create checkpoint structure with state. Uses RocksDBStateStoreProvider.
+   */
+  protected def runTransformWithStateQuery(checkpointRoot: String): Unit = {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key -> 
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString
+    ) {
+      val inputData = MemoryStream[String]
+      val result = inputData.toDS()
+        .groupByKey(x => x)
+        .transformWithState(new 
org.apache.spark.sql.streaming.RunningCountStatefulProcessor(),
+          TimeMode.None(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        StartStream(checkpointLocation = checkpointRoot),
+        AddData(inputData, "a"),
+        CheckNewAnswer(("a", "1")),
+        StopStream
+      )
+    }
+  }
+
   /**
    * Helper function to create a query that combines deduplication and 
aggregation.
    * This creates a more complex query with multiple stateful operators:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to