This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 896b62d3780a [SPARK-51940][SS] Add interface for managing streaming 
checkpoint metadata
896b62d3780a is described below

commit 896b62d3780ab6cd63788fecb499ad3119a89d54
Author: Jackie Zhang <[email protected]>
AuthorDate: Thu May 1 06:06:33 2025 +0900

    [SPARK-51940][SS] Add interface for managing streaming checkpoint metadata
    
    ### What changes were proposed in this pull request?
    Minor refactor to introduce an interface for accessing the metadata (e.g. 
offset / commit logs) in a streaming checkpoint.
    
    ### Why are the changes needed?
    To standardize the access pattern.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    This is a pure refactoring, existing tests should suffice.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50746 from jackierwzhang/spark-51940-checkpoint-metadata-interface.
    
    Authored-by: Jackie Zhang <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../datasources/v2/state/StateDataSource.scala     | 10 ++--
 .../AsyncProgressTrackingMicroBatchExecution.scala | 25 +++++----
 .../AsyncStreamingQueryCheckpointMetadata.scala    | 55 +++++++++++++++++++
 .../sql/execution/streaming/StreamExecution.scala  | 42 +++++---------
 .../streaming/StreamingCheckpointConstants.scala   |  1 +
 .../StreamingQueryCheckpointMetadata.scala         | 64 ++++++++++++++++++++++
 .../streaming/state/OperatorStateMetadata.scala    |  9 ++-
 7 files changed, 157 insertions(+), 49 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 28de01240222..937eb1fc042d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -33,8 +33,8 @@ import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.{J
 import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues.JoinSideValues
 import 
org.apache.spark.sql.execution.datasources.v2.state.metadata.{StateMetadataPartitionReader,
 StateMetadataTableEntry}
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, 
OffsetSeqMetadata, TimerStateUtils, TransformWithStateOperatorProperties, 
TransformWithStateVariableInfo}
-import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS, DIR_NAME_STATE}
+import org.apache.spark.sql.execution.streaming.{OffsetSeqMetadata, 
StreamingQueryCheckpointMetadata, TimerStateUtils, 
TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
+import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_STATE
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
 import 
org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, 
KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, 
StateSchemaMetadata, StateSchemaProvider, StateStore, 
StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
 import org.apache.spark.sql.sources.DataSourceRegister
@@ -122,8 +122,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
   override def supportsExternalMetadata(): Boolean = false
 
   private def buildStateStoreConf(checkpointLocation: String, batchId: Long): 
StateStoreConf = {
-    val offsetLog = new OffsetSeqLog(session,
-      new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+    val offsetLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
     offsetLog.get(batchId) match {
       case Some(value) =>
         val metadata = value.metadata.getOrElse(
@@ -548,8 +547,7 @@ object StateSourceOptions extends DataSourceOptions {
   }
 
   private def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Long = {
-    val commitLog = new CommitLog(session,
-      new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
+    val commitLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).commitLog
     commitLog.getLatest() match {
       case Some((lastId, _)) => lastId
       case None => throw 
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index 0f3ae844808e..bbfedd5454de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -83,16 +83,21 @@ class AsyncProgressTrackingMicroBatchExecution(
       }
     })
 
-  override val offsetLog = new AsyncOffsetSeqLog(
-    sparkSession,
-    checkpointFile("offsets"),
-    asyncWritesExecutorService,
-    asyncProgressTrackingCheckpointingIntervalMs,
-    clock = triggerClock
-  )
-
-  override val commitLog =
-    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+  /**
+   * Manages the metadata from this checkpoint location with async write 
operations.
+   */
+  private val asyncCheckpointMetadata =
+    new AsyncStreamingQueryCheckpointMetadata(
+      sparkSessionForStream,
+      resolvedCheckpointRoot,
+      asyncWritesExecutorService,
+      asyncProgressTrackingCheckpointingIntervalMs,
+      triggerClock
+    )
+
+  override lazy val offsetLog: AsyncOffsetSeqLog = 
asyncCheckpointMetadata.offsetLog
+
+  override lazy val commitLog: AsyncCommitLog = 
asyncCheckpointMetadata.commitLog
 
   // perform quick validation to fail faster
   validateAndGetTrigger()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
new file mode 100644
index 000000000000..e3a5ded83eb1
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ThreadPoolExecutor
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Clock
+
+/**
+ * A version of [[StreamingQueryCheckpointMetadata]] that supports async state 
checkpointing.
+ *
+ * @param sparkSession Spark session
+ * @param resolvedCheckpointRoot The resolved checkpoint root path
+ * @param asyncWritesExecutorService The executor service for async writes
+ * @param asyncProgressTrackingCheckpointingIntervalMs The interval for async 
progress
+ * @param triggerClock The clock to use for trigger time
+ */
+class AsyncStreamingQueryCheckpointMetadata(
+    sparkSession: SparkSession,
+    resolvedCheckpointRoot: String,
+    asyncWritesExecutorService: ThreadPoolExecutor,
+    asyncProgressTrackingCheckpointingIntervalMs: Long,
+    triggerClock: Clock)
+  extends StreamingQueryCheckpointMetadata(sparkSession, 
resolvedCheckpointRoot) {
+
+  override lazy val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override lazy val commitLog = new AsyncCommitLog(
+    sparkSession,
+    checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
+    asyncWritesExecutorService
+  )
+
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 708b7ee7e6f4..0bedba19baba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -151,16 +151,20 @@ abstract class StreamExecution(
    */
   protected def sources: Seq[SparkDataStream]
 
-  /** Metadata associated with the whole query */
-  protected val streamMetadata: StreamMetadata = {
-    val metadataPath = new Path(checkpointFile("metadata"))
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-    StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
-      val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
-      StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
-      newMetadata
-    }
-  }
+  /** Isolated spark session to run the batches with. */
+  protected val sparkSessionForStream: SparkSession = 
sparkSession.cloneSession()
+
+  /**
+   * Manages the metadata from this checkpoint location.
+   */
+  protected val checkpointMetadata =
+    new StreamingQueryCheckpointMetadata(sparkSessionForStream, 
resolvedCheckpointRoot)
+
+  private val streamMetadata: StreamMetadata = 
checkpointMetadata.streamMetadata
+
+  lazy val offsetLog: OffsetSeqLog = checkpointMetadata.offsetLog
+
+  lazy val commitLog: CommitLog = checkpointMetadata.commitLog
 
   /**
    * A map of current watermarks, keyed by the position of the watermark 
operator in the
@@ -209,9 +213,6 @@ abstract class StreamExecution(
   lazy val streamMetrics = new MetricsReporter(
     this, s"spark.streaming.${Option(name).getOrElse(id)}")
 
-  /** Isolated spark session to run the batches with. */
-  protected val sparkSessionForStream = sparkSession.cloneSession()
-
   /**
    * The thread that runs the micro-batches of this stream. Note that this 
thread must be
    * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: 
interrupting a
@@ -227,21 +228,6 @@ abstract class StreamExecution(
       }
     }
 
-  /**
-   * A write-ahead-log that records the offsets that are present in each 
batch. In order to ensure
-   * that a given batch will always consist of the same data, we write to this 
log *before* any
-   * processing is done.  Thus, the Nth record in this log indicated data that 
is currently being
-   * processed and the N-1th entry indicates which offsets have been durably 
committed to the sink.
-   */
-  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
-
-  /**
-   * A log that records the batch ids that have completed. This is used to 
check if a batch was
-   * fully processed, and its output was committed to the sink, hence no need 
to process it again.
-   * This is used (for instance) during restart, to help identify which batch 
to run next.
-   */
-  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
-
   /** Whether all fields of the query have been initialized */
   private def isInitialized: Boolean = state.get != INITIALIZING
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
index 932d5b0d75a2..27c76ba6bd67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
@@ -21,4 +21,5 @@ object StreamingCheckpointConstants {
   val DIR_NAME_COMMITS = "commits"
   val DIR_NAME_OFFSETS = "offsets"
   val DIR_NAME_STATE = "state"
+  val DIR_NAME_METADATA = "metadata"
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
new file mode 100644
index 000000000000..072ccb21e514
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An interface for accessing the checkpoint metadata associated with a 
streaming query.
+ * @param sparkSession Spark session
+ * @param resolvedCheckpointRoot The resolved checkpoint root path
+ */
+class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, 
resolvedCheckpointRoot: String) {
+
+  /**
+   * A write-ahead-log that records the offsets that are present in each 
batch. In order to ensure
+   * that a given batch will always consist of the same data, we write to this 
log *before* any
+   * processing is done.  Thus, the Nth record in this log indicated data that 
is currently being
+   * processed and the N-1th entry indicates which offsets have been durably 
committed to the sink.
+   */
+  lazy val offsetLog =
+    new OffsetSeqLog(sparkSession, 
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
+
+  /**
+   * A log that records the batch ids that have completed. This is used to 
check if a batch was
+   * fully processed, and its output was committed to the sink, hence no need 
to process it again.
+   * This is used (for instance) during restart, to help identify which batch 
to run next.
+   */
+  lazy val commitLog =
+    new CommitLog(sparkSession, 
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
+
+  /** Metadata associated with the whole query */
+  final lazy val streamMetadata: StreamMetadata = {
+    val metadataPath = new 
Path(checkpointFile(StreamingCheckpointConstants.DIR_NAME_METADATA))
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
+      val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
+      StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
+      newMetadata
+    }
+  }
+
+  /** Returns the path of a file with `name` in the checkpoint directory. */
+  final protected def checkpointFile(name: String): String =
+    new Path(new Path(resolvedCheckpointRoot), name).toString
+
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index befa3fb81722..1c97e9584790 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -31,9 +31,9 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
-import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
CommitLog, MetadataVersionUtil, OffsetSeqLog, StateStoreWriter}
+import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
CommitLog, MetadataVersionUtil, StateStoreWriter, 
StreamingQueryCheckpointMetadata}
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
-import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS}
+import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
 import 
org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataUtils.{OperatorStateMetadataReader,
 OperatorStateMetadataWriter}
 
 /**
@@ -172,14 +172,13 @@ object OperatorStateMetadataUtils extends Logging {
   }
 
   def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): 
Long = {
-    val offsetLog = new OffsetSeqLog(session,
-      new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+    val offsetLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
     offsetLog.getLatest().map(_._1).getOrElse(throw
       StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
   }
 
   def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Option[Long] = {
-    val commitLog = new CommitLog(session, new Path(checkpointLocation, 
DIR_NAME_COMMITS).toString)
+    val commitLog = new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).commitLog
     commitLog.getLatest().map(_._1)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to