This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 896b62d3780a [SPARK-51940][SS] Add interface for managing streaming
checkpoint metadata
896b62d3780a is described below
commit 896b62d3780ab6cd63788fecb499ad3119a89d54
Author: Jackie Zhang <[email protected]>
AuthorDate: Thu May 1 06:06:33 2025 +0900
[SPARK-51940][SS] Add interface for managing streaming checkpoint metadata
### What changes were proposed in this pull request?
Minor refactor to introduce an interface for accessing the metadata (e.g.
offset / commit logs) in a streaming checkpoint.
### Why are the changes needed?
To standardize the access pattern.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This is a pure refactoring, existing tests should suffice.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50746 from jackierwzhang/spark-51940-checkpoint-metadata-interface.
Authored-by: Jackie Zhang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../datasources/v2/state/StateDataSource.scala | 10 ++--
.../AsyncProgressTrackingMicroBatchExecution.scala | 25 +++++----
.../AsyncStreamingQueryCheckpointMetadata.scala | 55 +++++++++++++++++++
.../sql/execution/streaming/StreamExecution.scala | 42 +++++---------
.../streaming/StreamingCheckpointConstants.scala | 1 +
.../StreamingQueryCheckpointMetadata.scala | 64 ++++++++++++++++++++++
.../streaming/state/OperatorStateMetadata.scala | 9 ++-
7 files changed, 157 insertions(+), 49 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 28de01240222..937eb1fc042d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -33,8 +33,8 @@ import
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.{J
import
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues.JoinSideValues
import
org.apache.spark.sql.execution.datasources.v2.state.metadata.{StateMetadataPartitionReader,
StateMetadataTableEntry}
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog,
OffsetSeqMetadata, TimerStateUtils, TransformWithStateOperatorProperties,
TransformWithStateVariableInfo}
-import
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
DIR_NAME_OFFSETS, DIR_NAME_STATE}
+import org.apache.spark.sql.execution.streaming.{OffsetSeqMetadata,
StreamingQueryCheckpointMetadata, TimerStateUtils,
TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
+import
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_STATE
import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
RightSide}
import
org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider,
KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec,
PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker,
StateSchemaMetadata, StateSchemaProvider, StateStore,
StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.sources.DataSourceRegister
@@ -122,8 +122,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
override def supportsExternalMetadata(): Boolean = false
private def buildStateStoreConf(checkpointLocation: String, batchId: Long):
StateStoreConf = {
- val offsetLog = new OffsetSeqLog(session,
- new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+ val offsetLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).offsetLog
offsetLog.get(batchId) match {
case Some(value) =>
val metadata = value.metadata.getOrElse(
@@ -548,8 +547,7 @@ object StateSourceOptions extends DataSourceOptions {
}
private def getLastCommittedBatch(session: SparkSession, checkpointLocation:
String): Long = {
- val commitLog = new CommitLog(session,
- new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
+ val commitLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).commitLog
commitLog.getLatest() match {
case Some((lastId, _)) => lastId
case None => throw
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index 0f3ae844808e..bbfedd5454de 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -83,16 +83,21 @@ class AsyncProgressTrackingMicroBatchExecution(
}
})
- override val offsetLog = new AsyncOffsetSeqLog(
- sparkSession,
- checkpointFile("offsets"),
- asyncWritesExecutorService,
- asyncProgressTrackingCheckpointingIntervalMs,
- clock = triggerClock
- )
-
- override val commitLog =
- new AsyncCommitLog(sparkSession, checkpointFile("commits"),
asyncWritesExecutorService)
+ /**
+ * Manages the metadata from this checkpoint location with async write
operations.
+ */
+ private val asyncCheckpointMetadata =
+ new AsyncStreamingQueryCheckpointMetadata(
+ sparkSessionForStream,
+ resolvedCheckpointRoot,
+ asyncWritesExecutorService,
+ asyncProgressTrackingCheckpointingIntervalMs,
+ triggerClock
+ )
+
+ override lazy val offsetLog: AsyncOffsetSeqLog =
asyncCheckpointMetadata.offsetLog
+
+ override lazy val commitLog: AsyncCommitLog =
asyncCheckpointMetadata.commitLog
// perform quick validation to fail faster
validateAndGetTrigger()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
new file mode 100644
index 000000000000..e3a5ded83eb1
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncStreamingQueryCheckpointMetadata.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ThreadPoolExecutor
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Clock
+
+/**
+ * A version of [[StreamingQueryCheckpointMetadata]] that supports async state
checkpointing.
+ *
+ * @param sparkSession Spark session
+ * @param resolvedCheckpointRoot The resolved checkpoint root path
+ * @param asyncWritesExecutorService The executor service for async writes
+ * @param asyncProgressTrackingCheckpointingIntervalMs The interval for async
progress
+ * @param triggerClock The clock to use for trigger time
+ */
+class AsyncStreamingQueryCheckpointMetadata(
+ sparkSession: SparkSession,
+ resolvedCheckpointRoot: String,
+ asyncWritesExecutorService: ThreadPoolExecutor,
+ asyncProgressTrackingCheckpointingIntervalMs: Long,
+ triggerClock: Clock)
+ extends StreamingQueryCheckpointMetadata(sparkSession,
resolvedCheckpointRoot) {
+
+ override lazy val offsetLog = new AsyncOffsetSeqLog(
+ sparkSession,
+ checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
+ asyncWritesExecutorService,
+ asyncProgressTrackingCheckpointingIntervalMs,
+ clock = triggerClock
+ )
+
+ override lazy val commitLog = new AsyncCommitLog(
+ sparkSession,
+ checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
+ asyncWritesExecutorService
+ )
+
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 708b7ee7e6f4..0bedba19baba 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -151,16 +151,20 @@ abstract class StreamExecution(
*/
protected def sources: Seq[SparkDataStream]
- /** Metadata associated with the whole query */
- protected val streamMetadata: StreamMetadata = {
- val metadataPath = new Path(checkpointFile("metadata"))
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
- val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
- StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
- newMetadata
- }
- }
+ /** Isolated spark session to run the batches with. */
+ protected val sparkSessionForStream: SparkSession =
sparkSession.cloneSession()
+
+ /**
+ * Manages the metadata from this checkpoint location.
+ */
+ protected val checkpointMetadata =
+ new StreamingQueryCheckpointMetadata(sparkSessionForStream,
resolvedCheckpointRoot)
+
+ private val streamMetadata: StreamMetadata =
checkpointMetadata.streamMetadata
+
+ lazy val offsetLog: OffsetSeqLog = checkpointMetadata.offsetLog
+
+ lazy val commitLog: CommitLog = checkpointMetadata.commitLog
/**
* A map of current watermarks, keyed by the position of the watermark
operator in the
@@ -209,9 +213,6 @@ abstract class StreamExecution(
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")
- /** Isolated spark session to run the batches with. */
- protected val sparkSessionForStream = sparkSession.cloneSession()
-
/**
* The thread that runs the micro-batches of this stream. Note that this
thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894:
interrupting a
@@ -227,21 +228,6 @@ abstract class StreamExecution(
}
}
- /**
- * A write-ahead-log that records the offsets that are present in each
batch. In order to ensure
- * that a given batch will always consist of the same data, we write to this
log *before* any
- * processing is done. Thus, the Nth record in this log indicated data that
is currently being
- * processed and the N-1th entry indicates which offsets have been durably
committed to the sink.
- */
- val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
-
- /**
- * A log that records the batch ids that have completed. This is used to
check if a batch was
- * fully processed, and its output was committed to the sink, hence no need
to process it again.
- * This is used (for instance) during restart, to help identify which batch
to run next.
- */
- val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
-
/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
index 932d5b0d75a2..27c76ba6bd67 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
@@ -21,4 +21,5 @@ object StreamingCheckpointConstants {
val DIR_NAME_COMMITS = "commits"
val DIR_NAME_OFFSETS = "offsets"
val DIR_NAME_STATE = "state"
+ val DIR_NAME_METADATA = "metadata"
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
new file mode 100644
index 000000000000..072ccb21e514
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadata.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An interface for accessing the checkpoint metadata associated with a
streaming query.
+ * @param sparkSession Spark session
+ * @param resolvedCheckpointRoot The resolved checkpoint root path
+ */
+class StreamingQueryCheckpointMetadata(sparkSession: SparkSession,
resolvedCheckpointRoot: String) {
+
+ /**
+ * A write-ahead-log that records the offsets that are present in each
batch. In order to ensure
+ * that a given batch will always consist of the same data, we write to this
log *before* any
+ * processing is done. Thus, the Nth record in this log indicated data that
is currently being
+ * processed and the N-1th entry indicates which offsets have been durably
committed to the sink.
+ */
+ lazy val offsetLog =
+ new OffsetSeqLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
+
+ /**
+ * A log that records the batch ids that have completed. This is used to
check if a batch was
+ * fully processed, and its output was committed to the sink, hence no need
to process it again.
+ * This is used (for instance) during restart, to help identify which batch
to run next.
+ */
+ lazy val commitLog =
+ new CommitLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
+
+ /** Metadata associated with the whole query */
+ final lazy val streamMetadata: StreamMetadata = {
+ val metadataPath = new
Path(checkpointFile(StreamingCheckpointConstants.DIR_NAME_METADATA))
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
+ val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
+ StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
+ newMetadata
+ }
+ }
+
+ /** Returns the path of a file with `name` in the checkpoint directory. */
+ final protected def checkpointFile(name: String): String =
+ new Path(new Path(resolvedCheckpointRoot), name).toString
+
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index befa3fb81722..1c97e9584790 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -31,9 +31,9 @@ import org.json4s.jackson.Serialization
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
-import org.apache.spark.sql.execution.streaming.{CheckpointFileManager,
CommitLog, MetadataVersionUtil, OffsetSeqLog, StateStoreWriter}
+import org.apache.spark.sql.execution.streaming.{CheckpointFileManager,
CommitLog, MetadataVersionUtil, StateStoreWriter,
StreamingQueryCheckpointMetadata}
import
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
-import
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
DIR_NAME_OFFSETS}
+import
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
import
org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataUtils.{OperatorStateMetadataReader,
OperatorStateMetadataWriter}
/**
@@ -172,14 +172,13 @@ object OperatorStateMetadataUtils extends Logging {
}
def getLastOffsetBatch(session: SparkSession, checkpointLocation: String):
Long = {
- val offsetLog = new OffsetSeqLog(session,
- new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+ val offsetLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).offsetLog
offsetLog.getLatest().map(_._1).getOrElse(throw
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
}
def getLastCommittedBatch(session: SparkSession, checkpointLocation:
String): Option[Long] = {
- val commitLog = new CommitLog(session, new Path(checkpointLocation,
DIR_NAME_COMMITS).toString)
+ val commitLog = new StreamingQueryCheckpointMetadata(session,
checkpointLocation).commitLog
commitLog.getLatest().map(_._1)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]