This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bac7ce10afec [SPARK-55493][SS] Do not mkdirs in streaming checkpoint 
offset/commit log directory in StateDataSource
bac7ce10afec is described below

commit bac7ce10afec9aea3640c452d8a85aa8a9457509
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Mar 13 14:36:25 2026 -0700

    [SPARK-55493][SS] Do not mkdirs in streaming checkpoint offset/commit log 
directory in StateDataSource
    
    ### What changes were proposed in this pull request?
    
    Previously, we try to create a new directory for offsets and commits in the 
checkpoint directory if they don't exist when running `StateDataSource`. This 
is because the utility functions are shared with streaming query functionality 
which needs to create these dirs if they don't exist. This change creates new 
readOnly modes for the utilities so that datasources do not need to mkdirs.
    
    ### Why are the changes needed?
    
    Allow usage of StateDataSource on checkpoints that are read-only.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: claude opus 4.6
    
    Closes #54381 from liviazhu/liviazhu-db/stds-metadatalog-fix.
    
    Authored-by: Livia Zhu <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../datasources/v2/state/StateDataSource.scala     |  11 ++-
 .../streaming/checkpointing/CommitLog.scala        |   7 +-
 .../streaming/checkpointing/HDFSMetadataLog.scala  |  16 +++-
 .../streaming/checkpointing/OffsetSeqLog.scala     |   7 +-
 .../runtime/StreamingQueryCheckpointMetadata.scala |  13 ++-
 .../streaming/state/OperatorStateMetadata.scala    |   6 +-
 .../v2/state/StateDataSourceReadSuite.scala        | 102 ++++++++++++++++++++-
 .../execution/streaming/HDFSMetadataLogSuite.scala |  20 ++++
 8 files changed, 166 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 07b756006525..672e8e378618 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -150,7 +150,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
       batchId: Long): Option[Int] = {
     if (storeMetadata.nonEmpty &&
       storeMetadata.head.operatorName == 
StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
-      new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
+      new StreamingQueryCheckpointMetadata(session, checkpointLocation, 
readOnly = true).offsetLog
         .get(batchId)
         .flatMap(_.metadataOpt)
         .flatMap(_.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key))
@@ -178,7 +178,8 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
   private def buildSqlConfForBatch(
       checkpointLocation: String,
       batchId: Long): SQLConf = {
-    val offsetLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
+    val offsetLog = new StreamingQueryCheckpointMetadata(
+      session, checkpointLocation, readOnly = true).offsetLog
     offsetLog.get(batchId) match {
       case Some(value) =>
         val metadata = value.metadataOpt.getOrElse(
@@ -764,7 +765,8 @@ object StateSourceOptions extends DataSourceOptions with 
Logging{
   }
 
   private def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Long = {
-    val commitLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).commitLog
+    val commitLog = new StreamingQueryCheckpointMetadata(
+      session, checkpointLocation, readOnly = true).commitLog
     commitLog.getLatest() match {
       case Some((lastId, _)) => lastId
       case None => throw 
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
@@ -776,7 +778,8 @@ object StateSourceOptions extends DataSourceOptions with 
Logging{
     batchId: Long,
     operatorId: Long,
     checkpointLocation: String): Option[Array[Array[String]]] = {
-    val commitLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).commitLog
+    val commitLog = new StreamingQueryCheckpointMetadata(
+      session, checkpointLocation, readOnly = true).commitLog
     val commitMetadata = commitLog.get(batchId) match {
       case Some(commitMetadata) => commitMetadata
       case None => throw 
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
index 6892b6b535cf..b73020b6060c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
@@ -46,8 +46,11 @@ import org.apache.spark.sql.internal.SQLConf
  * line 1: version
  * line 2: metadata (optional json string)
  */
-class CommitLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
+class CommitLog(
+    sparkSession: SparkSession,
+    path: String,
+    readOnly: Boolean = false)
+  extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) {
 
   import CommitLog._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
index 6d35b1a8f8c0..fac502f75f3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
@@ -48,7 +48,10 @@ import org.apache.spark.util.Utils
  * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they 
don't guarantee listing
  * files in a directory always shows the latest files.
  */
-class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, 
path: String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](
+    sparkSession: SparkSession,
+    path: String,
+    readOnly: Boolean = false)
   extends MetadataLog[T] with Logging {
 
   private implicit val formats: Formats = Serialization.formats(NoTypeHints)
@@ -66,7 +69,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   protected val fileManager =
     CheckpointFileManager.create(metadataPath, 
sparkSession.sessionState.newHadoopConf())
 
-  if (!fileManager.exists(metadataPath)) {
+  // When readOnly is false and the metadata path does not exist, create the 
directory
+  if (!readOnly && !fileManager.exists(metadataPath)) {
     fileManager.mkdirs(metadataPath)
   }
 
@@ -327,6 +331,10 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 
   /** List the available batches on file system. */
   protected def listBatches: Array[Long] = {
+    // If parent doesn't exist, return empty array rather than throwing an 
exception
+    if (!fileManager.exists(metadataPath)) {
+      return Array.empty
+    }
     val batchIds = fileManager.list(metadataPath, batchFilesFilter)
       // Batches must be files
       .filter(f => f.isFile)
@@ -351,6 +359,10 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
    * @return array of batches ids
    */
   def listBatchesOnDisk: Array[Long] = {
+    // If parent doesn't exist, return empty array rather than throwing an 
exception
+    if (!fileManager.exists(metadataPath)) {
+      return Array.empty
+    }
     fileManager.list(metadataPath, batchFilesFilter)
       .map(f => pathToBatchId(f.getPath)).sorted
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
index 8af138e330c4..ab67915c0151 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
@@ -51,8 +51,11 @@ import 
org.apache.spark.sql.execution.streaming.runtime.SerializedOffset
  *   1:{3}     // sourceId:offset
  *   ...
  */
-class OffsetSeqLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path) {
+class OffsetSeqLog(
+    sparkSession: SparkSession,
+    path: String,
+    readOnly: Boolean = false)
+  extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path, readOnly) {
 
   override protected def deserialize(in: InputStream): OffsetSeqBase = {
     // called inside a try-finally where the underlying stream is closed in 
the caller
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
index 4e02f323b89a..4a6a2e735d37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
@@ -30,7 +30,10 @@ import org.apache.spark.sql.internal.SQLConf
  * @param sparkSession Spark session
  * @param resolvedCheckpointRoot The resolved checkpoint root path
  */
-class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, 
resolvedCheckpointRoot: String) {
+class StreamingQueryCheckpointMetadata(
+    sparkSession: SparkSession,
+    resolvedCheckpointRoot: String,
+    readOnly: Boolean = false) {
 
   /**
    * A write-ahead-log that records the offsets that are present in each 
batch. In order to ensure
@@ -39,7 +42,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: 
SparkSession, resolvedCheck
    * processed and the N-1th entry indicates which offsets have been durably 
committed to the sink.
    */
   lazy val offsetLog =
-    new OffsetSeqLog(sparkSession, 
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
+    new OffsetSeqLog(sparkSession,
+      checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
+      readOnly)
 
   /**
    * A log that records the batch ids that have completed. This is used to 
check if a batch was
@@ -47,7 +52,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: 
SparkSession, resolvedCheck
    * This is used (for instance) during restart, to help identify which batch 
to run next.
    */
   lazy val commitLog =
-    new CommitLog(sparkSession, 
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
+    new CommitLog(sparkSession,
+      checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
+      readOnly)
 
   /** Metadata associated with the whole query */
   final lazy val streamMetadata: StreamMetadata = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index ac0f42c34007..e046be487999 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -183,13 +183,15 @@ object OperatorStateMetadataUtils extends Logging {
   }
 
   def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): 
Long = {
-    val offsetLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
+    val offsetLog = new StreamingQueryCheckpointMetadata(
+      session, checkpointLocation, readOnly = true).offsetLog
     offsetLog.getLatest().map(_._1).getOrElse(throw
       StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
   }
 
   def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Option[Long] = {
-    val commitLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).commitLog
+    val commitLog = new StreamingQueryCheckpointMetadata(
+      session, checkpointLocation, readOnly = true).commitLog
     commitLog.getLatest().map(_._1)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index ce29c87bc76e..05916c4816a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -1526,6 +1526,107 @@ class StateDataSourceNoEmptyDirCreationSuite extends 
StateDataSourceTestBase {
         s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}")
   }
 
+  test("deleted offsets directory is not recreated on read") {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+      runLargeDataStreamingAggregationQuery(checkpointPath)
+
+      val offsetsDir = new File(tempDir, "offsets")
+      assert(offsetsDir.exists(), "Offsets directory should exist after 
running the query")
+      Utils.deleteRecursively(offsetsDir)
+      assert(!offsetsDir.exists(), "Offsets directory should be deleted")
+
+      val e1 = intercept[Exception] {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+      assertCauseChainContains(e1,
+        classOf[StateDataSourceOffsetLogUnavailable])
+
+      assert(!offsetsDir.exists(),
+        "State data source reader should not recreate the deleted offsets 
directory")
+    }
+  }
+
+  test("deleted commits directory is not recreated on read") {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+      runLargeDataStreamingAggregationQuery(checkpointPath)
+
+      val commitsDir = new File(tempDir, "commits")
+      assert(commitsDir.exists(), "Commits directory should exist after 
running the query")
+      Utils.deleteRecursively(commitsDir)
+      assert(!commitsDir.exists(), "Commits directory should be deleted")
+
+      val e2 = intercept[Exception] {
+        spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, checkpointPath)
+          .load()
+          .collect()
+      }
+      assertCauseChainContains(e2,
+        classOf[StataDataSourceCommittedBatchUnavailable])
+
+      assert(!commitsDir.exists(),
+        "State data source reader should not recreate the deleted commits 
directory")
+    }
+  }
+
+  test("deleted commits directory is not recreated on read (state-metadata 
source)") {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+      runLargeDataStreamingAggregationQuery(checkpointPath)
+
+      val commitsDir = new File(tempDir, "commits")
+      assert(commitsDir.exists(), "Commits directory should exist after 
running the query")
+      Utils.deleteRecursively(commitsDir)
+      assert(!commitsDir.exists(), "Commits directory should be deleted")
+
+      spark.read.format("state-metadata").load(checkpointPath).collect()
+
+      assert(!commitsDir.exists(),
+        "State-metadata source reader should not recreate the deleted commits 
directory")
+    }
+  }
+
+  test("deleted offsets directory is not recreated on read (state-metadata 
source)") {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+      runLargeDataStreamingAggregationQuery(checkpointPath)
+
+      val offsetsDir = new File(tempDir, "offsets")
+      assert(offsetsDir.exists(), "Offsets directory should exist after 
running the query")
+      Utils.deleteRecursively(offsetsDir)
+      assert(!offsetsDir.exists(), "Offsets directory should be deleted")
+
+      spark.read.format("state-metadata").load(checkpointPath).collect()
+
+      assert(!offsetsDir.exists(),
+        "State-metadata source reader should not recreate the deleted offsets 
directory")
+    }
+  }
+
+  test("deleted state directory is not recreated on read (state-metadata 
source)") {
+    withTempDir { tempDir =>
+      val checkpointPath = tempDir.getAbsolutePath
+      runLargeDataStreamingAggregationQuery(checkpointPath)
+
+      val stateDir = new File(tempDir, "state")
+      assert(stateDir.exists(), "State directory should exist after running 
the query")
+      Utils.deleteRecursively(stateDir)
+      assert(!stateDir.exists(), "State directory should be deleted")
+
+      spark.read.format("state-metadata").load(checkpointPath).collect()
+
+      assert(!stateDir.exists(),
+        "State-metadata source reader should not recreate the deleted state 
directory")
+    }
+  }
+
   /**
    * Runs a stateful query to create the checkpoint structure, deletes the 
state directory,
    * then attempts to read via the state data source and verifies that the 
state directory
@@ -1656,5 +1757,4 @@ class StateDataSourceNoEmptyDirCreationSuite extends 
StateDataSourceTestBase {
       }
     )
   }
-
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index d6702c1e4ea5..827906258378 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -218,4 +218,24 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
     intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L)))
     intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L)))
   }
+
+  test("HDFSMetadataLog: readOnly=false always creates directory") {
+    withTempDir { temp =>
+      val dir = new File(temp, "nonexistent")
+      assert(!dir.exists())
+      new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+      assert(dir.exists(),
+        "HDFSMetadataLog should create directory when readOnly=false 
(default)")
+    }
+  }
+
+  test("HDFSMetadataLog: readOnly=true does not create directory") {
+    withTempDir { temp =>
+      val dir = new File(temp, "nonexistent")
+      assert(!dir.exists())
+      new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true)
+      assert(!dir.exists(),
+        "HDFSMetadataLog should not create directory when readOnly=true")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to