This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bac7ce10afec [SPARK-55493][SS] Do not mkdirs in streaming checkpoint
offset/commit log directory in StateDataSource
bac7ce10afec is described below
commit bac7ce10afec9aea3640c452d8a85aa8a9457509
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Mar 13 14:36:25 2026 -0700
[SPARK-55493][SS] Do not mkdirs in streaming checkpoint offset/commit log
directory in StateDataSource
### What changes were proposed in this pull request?
Previously, we try to create a new directory for offsets and commits in the
checkpoint directory if they don't exist when running `StateDataSource`. This
is because the utility functions are shared with streaming query functionality
which needs to create these dirs if they don't exist. This change creates new
readOnly modes for the utilities so that datasources do not need to mkdirs.
### Why are the changes needed?
Allow usage of StateDataSource on checkpoints that are read-only.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude opus 4.6
Closes #54381 from liviazhu/liviazhu-db/stds-metadatalog-fix.
Authored-by: Livia Zhu <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../datasources/v2/state/StateDataSource.scala | 11 ++-
.../streaming/checkpointing/CommitLog.scala | 7 +-
.../streaming/checkpointing/HDFSMetadataLog.scala | 16 +++-
.../streaming/checkpointing/OffsetSeqLog.scala | 7 +-
.../runtime/StreamingQueryCheckpointMetadata.scala | 13 ++-
.../streaming/state/OperatorStateMetadata.scala | 6 +-
.../v2/state/StateDataSourceReadSuite.scala | 102 ++++++++++++++++++++-
.../execution/streaming/HDFSMetadataLogSuite.scala | 20 ++++
8 files changed, 166 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 07b756006525..672e8e378618 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -150,7 +150,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
batchId: Long): Option[Int] = {
if (storeMetadata.nonEmpty &&
storeMetadata.head.operatorName ==
StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
- new StreamingQueryCheckpointMetadata(session,
checkpointLocation).offsetLog
+ new StreamingQueryCheckpointMetadata(session, checkpointLocation,
readOnly = true).offsetLog
.get(batchId)
.flatMap(_.metadataOpt)
.flatMap(_.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key))
@@ -178,7 +178,8 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
private def buildSqlConfForBatch(
checkpointLocation: String,
batchId: Long): SQLConf = {
- val offsetLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).offsetLog
+ val offsetLog = new StreamingQueryCheckpointMetadata(
+ session, checkpointLocation, readOnly = true).offsetLog
offsetLog.get(batchId) match {
case Some(value) =>
val metadata = value.metadataOpt.getOrElse(
@@ -764,7 +765,8 @@ object StateSourceOptions extends DataSourceOptions with
Logging{
}
private def getLastCommittedBatch(session: SparkSession, checkpointLocation:
String): Long = {
- val commitLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).commitLog
+ val commitLog = new StreamingQueryCheckpointMetadata(
+ session, checkpointLocation, readOnly = true).commitLog
commitLog.getLatest() match {
case Some((lastId, _)) => lastId
case None => throw
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
@@ -776,7 +778,8 @@ object StateSourceOptions extends DataSourceOptions with
Logging{
batchId: Long,
operatorId: Long,
checkpointLocation: String): Option[Array[Array[String]]] = {
- val commitLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).commitLog
+ val commitLog = new StreamingQueryCheckpointMetadata(
+ session, checkpointLocation, readOnly = true).commitLog
val commitMetadata = commitLog.get(batchId) match {
case Some(commitMetadata) => commitMetadata
case None => throw
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
index 6892b6b535cf..b73020b6060c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala
@@ -46,8 +46,11 @@ import org.apache.spark.sql.internal.SQLConf
* line 1: version
* line 2: metadata (optional json string)
*/
-class CommitLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
+class CommitLog(
+ sparkSession: SparkSession,
+ path: String,
+ readOnly: Boolean = false)
+ extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) {
import CommitLog._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
index 6d35b1a8f8c0..fac502f75f3a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala
@@ -48,7 +48,10 @@ import org.apache.spark.util.Utils
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they
don't guarantee listing
* files in a directory always shows the latest files.
*/
-class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession,
path: String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](
+ sparkSession: SparkSession,
+ path: String,
+ readOnly: Boolean = false)
extends MetadataLog[T] with Logging {
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
@@ -66,7 +69,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession:
SparkSession, path:
protected val fileManager =
CheckpointFileManager.create(metadataPath,
sparkSession.sessionState.newHadoopConf())
- if (!fileManager.exists(metadataPath)) {
+ // When readOnly is false and the metadata path does not exist, create the
directory
+ if (!readOnly && !fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
}
@@ -327,6 +331,10 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
+ // If parent doesn't exist, return empty array rather than throwing an
exception
+ if (!fileManager.exists(metadataPath)) {
+ return Array.empty
+ }
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
// Batches must be files
.filter(f => f.isFile)
@@ -351,6 +359,10 @@ class HDFSMetadataLog[T <: AnyRef :
ClassTag](sparkSession: SparkSession, path:
* @return array of batches ids
*/
def listBatchesOnDisk: Array[Long] = {
+ // If parent doesn't exist, return empty array rather than throwing an
exception
+ if (!fileManager.exists(metadataPath)) {
+ return Array.empty
+ }
fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath)).sorted
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
index 8af138e330c4..ab67915c0151 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala
@@ -51,8 +51,11 @@ import
org.apache.spark.sql.execution.streaming.runtime.SerializedOffset
* 1:{3} // sourceId:offset
* ...
*/
-class OffsetSeqLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path) {
+class OffsetSeqLog(
+ sparkSession: SparkSession,
+ path: String,
+ readOnly: Boolean = false)
+ extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path, readOnly) {
override protected def deserialize(in: InputStream): OffsetSeqBase = {
// called inside a try-finally where the underlying stream is closed in
the caller
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
index 4e02f323b89a..4a6a2e735d37 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
@@ -30,7 +30,10 @@ import org.apache.spark.sql.internal.SQLConf
* @param sparkSession Spark session
* @param resolvedCheckpointRoot The resolved checkpoint root path
*/
-class StreamingQueryCheckpointMetadata(sparkSession: SparkSession,
resolvedCheckpointRoot: String) {
+class StreamingQueryCheckpointMetadata(
+ sparkSession: SparkSession,
+ resolvedCheckpointRoot: String,
+ readOnly: Boolean = false) {
/**
* A write-ahead-log that records the offsets that are present in each
batch. In order to ensure
@@ -39,7 +42,9 @@ class StreamingQueryCheckpointMetadata(sparkSession:
SparkSession, resolvedCheck
* processed and the N-1th entry indicates which offsets have been durably
committed to the sink.
*/
lazy val offsetLog =
- new OffsetSeqLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
+ new OffsetSeqLog(sparkSession,
+ checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
+ readOnly)
/**
* A log that records the batch ids that have completed. This is used to
check if a batch was
@@ -47,7 +52,9 @@ class StreamingQueryCheckpointMetadata(sparkSession:
SparkSession, resolvedCheck
* This is used (for instance) during restart, to help identify which batch
to run next.
*/
lazy val commitLog =
- new CommitLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
+ new CommitLog(sparkSession,
+ checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
+ readOnly)
/** Metadata associated with the whole query */
final lazy val streamMetadata: StreamMetadata = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index ac0f42c34007..e046be487999 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -183,13 +183,15 @@ object OperatorStateMetadataUtils extends Logging {
}
def getLastOffsetBatch(session: SparkSession, checkpointLocation: String):
Long = {
- val offsetLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).offsetLog
+ val offsetLog = new StreamingQueryCheckpointMetadata(
+ session, checkpointLocation, readOnly = true).offsetLog
offsetLog.getLatest().map(_._1).getOrElse(throw
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
}
def getLastCommittedBatch(session: SparkSession, checkpointLocation:
String): Option[Long] = {
- val commitLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).commitLog
+ val commitLog = new StreamingQueryCheckpointMetadata(
+ session, checkpointLocation, readOnly = true).commitLog
commitLog.getLatest().map(_._1)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index ce29c87bc76e..05916c4816a9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -1526,6 +1526,107 @@ class StateDataSourceNoEmptyDirCreationSuite extends
StateDataSourceTestBase {
s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}")
}
+ test("deleted offsets directory is not recreated on read") {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+
+ val offsetsDir = new File(tempDir, "offsets")
+ assert(offsetsDir.exists(), "Offsets directory should exist after
running the query")
+ Utils.deleteRecursively(offsetsDir)
+ assert(!offsetsDir.exists(), "Offsets directory should be deleted")
+
+ val e1 = intercept[Exception] {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ assertCauseChainContains(e1,
+ classOf[StateDataSourceOffsetLogUnavailable])
+
+ assert(!offsetsDir.exists(),
+ "State data source reader should not recreate the deleted offsets
directory")
+ }
+ }
+
+ test("deleted commits directory is not recreated on read") {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+
+ val commitsDir = new File(tempDir, "commits")
+ assert(commitsDir.exists(), "Commits directory should exist after
running the query")
+ Utils.deleteRecursively(commitsDir)
+ assert(!commitsDir.exists(), "Commits directory should be deleted")
+
+ val e2 = intercept[Exception] {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ assertCauseChainContains(e2,
+ classOf[StataDataSourceCommittedBatchUnavailable])
+
+ assert(!commitsDir.exists(),
+ "State data source reader should not recreate the deleted commits
directory")
+ }
+ }
+
+ test("deleted commits directory is not recreated on read (state-metadata
source)") {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+
+ val commitsDir = new File(tempDir, "commits")
+ assert(commitsDir.exists(), "Commits directory should exist after
running the query")
+ Utils.deleteRecursively(commitsDir)
+ assert(!commitsDir.exists(), "Commits directory should be deleted")
+
+ spark.read.format("state-metadata").load(checkpointPath).collect()
+
+ assert(!commitsDir.exists(),
+ "State-metadata source reader should not recreate the deleted commits
directory")
+ }
+ }
+
+ test("deleted offsets directory is not recreated on read (state-metadata
source)") {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+
+ val offsetsDir = new File(tempDir, "offsets")
+ assert(offsetsDir.exists(), "Offsets directory should exist after
running the query")
+ Utils.deleteRecursively(offsetsDir)
+ assert(!offsetsDir.exists(), "Offsets directory should be deleted")
+
+ spark.read.format("state-metadata").load(checkpointPath).collect()
+
+ assert(!offsetsDir.exists(),
+ "State-metadata source reader should not recreate the deleted offsets
directory")
+ }
+ }
+
+ test("deleted state directory is not recreated on read (state-metadata
source)") {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+
+ val stateDir = new File(tempDir, "state")
+ assert(stateDir.exists(), "State directory should exist after running
the query")
+ Utils.deleteRecursively(stateDir)
+ assert(!stateDir.exists(), "State directory should be deleted")
+
+ spark.read.format("state-metadata").load(checkpointPath).collect()
+
+ assert(!stateDir.exists(),
+ "State-metadata source reader should not recreate the deleted state
directory")
+ }
+ }
+
/**
* Runs a stateful query to create the checkpoint structure, deletes the
state directory,
* then attempts to read via the state data source and verifies that the
state directory
@@ -1656,5 +1757,4 @@ class StateDataSourceNoEmptyDirCreationSuite extends
StateDataSourceTestBase {
}
)
}
-
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index d6702c1e4ea5..827906258378 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -218,4 +218,24 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L)))
intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L)))
}
+
+ test("HDFSMetadataLog: readOnly=false always creates directory") {
+ withTempDir { temp =>
+ val dir = new File(temp, "nonexistent")
+ assert(!dir.exists())
+ new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+ assert(dir.exists(),
+ "HDFSMetadataLog should create directory when readOnly=false
(default)")
+ }
+ }
+
+ test("HDFSMetadataLog: readOnly=true does not create directory") {
+ withTempDir { temp =>
+ val dir = new File(temp, "nonexistent")
+ assert(!dir.exists())
+ new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true)
+ assert(!dir.exists(),
+ "HDFSMetadataLog should not create directory when readOnly=true")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]