This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1f3af79e975 [SPARK-55058][SS] Throw error on inconsistent checkpoint
metadata
f1f3af79e975 is described below
commit f1f3af79e97548e8982ee1e84b5356442d8899c3
Author: Jerry Zheng <[email protected]>
AuthorDate: Mon Jan 26 10:30:18 2026 -0800
[SPARK-55058][SS] Throw error on inconsistent checkpoint metadata
### What changes were proposed in this pull request?
This PR will add validation when accessing the checkpoint to detect this
inconsistent state and throw an error before the query can start with a new
query ID.
### Why are the changes needed?
When a streaming checkpoint directory has non-empty offset and commit logs
but is missing the metadata file (containing the streaming query ID), the query
will generate a new UUID on restart. This breaks the deduplication mechanism of
exactly-once sinks like which relies on the streaming query ID to skip
already-processed batches, leading to data duplication.
### Does this PR introduce _any_ user-facing change?
Yes. There is a new error condition MISSING_METADATA_FILE that occurs when
a streaming checkpoint directory has non-empty offset and commit logs but is
missing the metadata file.
### How was this patch tested?
Unit tests and integration tests are added.
### Was this patch authored or co-authored using generative AI tooling?
Used to assist in writing test suite.
Generated-by: Claude Sonnet 4.5
Closes #53844 from jerrytq/jerry-zheng_data/SPARK-55058.
Authored-by: Jerry Zheng <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +
.../org/apache/spark/sql/internal/SQLConf.scala | 11 +
.../sql/execution/streaming/StreamingErrors.scala | 7 +
.../runtime/StreamingQueryCheckpointMetadata.scala | 14 ++
.../StreamingQueryCheckpointMetadataSuite.scala | 246 +++++++++++++++++++++
5 files changed, 284 insertions(+)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index dd7874571820..8d383f2ab64e 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5942,6 +5942,12 @@
],
"sqlState" : "42601"
},
+ "STREAMING_CHECKPOINT_MISSING_METADATA_FILE" : {
+ "message" : [
+ "Checkpoint location <checkpointLocation> is in an inconsistent state:
the metadata file is missing but offset and/or commit logs contain data. Please
restore the metadata file or create a new checkpoint directory."
+ ],
+ "sqlState" : "42K03"
+ },
"STREAMING_OUTPUT_MODE" : {
"message" : [
"Invalid streaming output mode: <outputMode>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d467cc87b442..c8eddffe5c65 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2955,6 +2955,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STREAMING_CHECKPOINT_VERIFY_METADATA_EXISTS =
+ buildConf("spark.sql.streaming.checkpoint.verifyMetadataExists.enabled")
+ .internal()
+ .doc("When true, validates that the checkpoint metadata file exists when
offset " +
+ "or commit logs contain data. This prevents generating a new query ID
when " +
+ "checkpoint data already exists, which would cause data duplication in
" +
+ "exactly-once sinks.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(true)
+
val STATE_STORE_COMPRESSION_CODEC =
buildConf("spark.sql.streaming.stateStore.compression.codec")
.internal()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
index 38361686d083..5efb5e50cd8f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
@@ -52,4 +52,11 @@ object StreamingErrors {
)
)
}
+
+ def missingMetadataFile(checkpointLocation: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "STREAMING_CHECKPOINT_MISSING_METADATA_FILE",
+ messageParameters = Map("checkpointLocation" -> checkpointLocation)
+ )
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
index cc7e92d9d7d1..4e02f323b89a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala
@@ -21,7 +21,9 @@ import java.util.UUID
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingErrors
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog,
OffsetSeqLog}
+import org.apache.spark.sql.internal.SQLConf
/**
* An interface for accessing the checkpoint metadata associated with a
streaming query.
@@ -52,6 +54,18 @@ class StreamingQueryCheckpointMetadata(sparkSession:
SparkSession, resolvedCheck
val metadataPath = new
Path(checkpointFile(StreamingCheckpointConstants.DIR_NAME_METADATA))
val hadoopConf = sparkSession.sessionState.newHadoopConf()
StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
+ // Before creating a new metadata file with a new query ID, validate
that the checkpoint
+ // is not in an inconsistent state where the metadata file is missing
but offset/commit
+ // logs have data. This prevents data duplication when using
exactly-once sinks
+ // which rely on the query ID for deduplication.
+ if
(sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_VERIFY_METADATA_EXISTS)) {
+ val hasOffsetData = offsetLog.getLatestBatchId().isDefined
+ val hasCommitData = commitLog.getLatestBatchId().isDefined
+ if (hasOffsetData || hasCommitData) {
+ throw StreamingErrors
+ .missingMetadataFile(resolvedCheckpointRoot)
+ }
+ }
val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
newMetadata
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadataSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadataSuite.scala
new file mode 100644
index 000000000000..00c967876c72
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQueryCheckpointMetadataSuite.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkRuntimeException
+import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog,
CommitMetadata, OffsetSeq, OffsetSeqLog}
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream,
StreamingQueryCheckpointMetadata, StreamMetadata}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+
+class StreamingQueryCheckpointMetadataSuite extends StreamTest {
+ import testImplicits._
+
+ /**
+ * Creates checkpoint metadata with optional offset and commit log data.
+ * Returns the initialized metadata and checkpoint root path.
+ */
+ private def createCheckpointWithLogs(
+ dir: File,
+ addOffsets: Boolean = false,
+ addCommits: Boolean = false): (StreamingQueryCheckpointMetadata, String)
= {
+ val checkpointRoot = dir.getAbsolutePath
+ val checkpointMetadata = new StreamingQueryCheckpointMetadata(spark,
checkpointRoot)
+ checkpointMetadata.streamMetadata // Initialize metadata
+
+ if (addOffsets) {
+ checkpointMetadata.offsetLog.add(0, OffsetSeq.fill(None))
+ checkpointMetadata.offsetLog.add(1, OffsetSeq.fill(None))
+ }
+
+ if (addCommits) {
+ checkpointMetadata.commitLog.add(0, CommitMetadata())
+ checkpointMetadata.commitLog.add(1, CommitMetadata())
+ }
+
+ (checkpointMetadata, checkpointRoot)
+ }
+
+ /**
+ * Deletes the metadata file from a checkpoint directory.
+ */
+ private def deleteMetadataFile(checkpointRoot: String): Unit = {
+ val metadataPath = new Path(new Path(checkpointRoot), "metadata")
+ val fs = metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.delete(metadataPath, false)
+ }
+
+ /**
+ * Validates that accessing streamMetadata throws the expected missing
metadata error.
+ */
+ private def assertMissingMetadataError(checkpointRoot: String): Unit = {
+ val checkpointMetadata = new StreamingQueryCheckpointMetadata(spark,
checkpointRoot)
+ val exception = intercept[SparkRuntimeException] {
+ checkpointMetadata.streamMetadata
+ }
+ checkError(
+ exception = exception,
+ condition = "STREAMING_CHECKPOINT_MISSING_METADATA_FILE",
+ parameters = Map("checkpointLocation" -> checkpointRoot)
+ )
+ }
+
+ /**
+ * Creates new checkpoint metadata and validates it has a valid UUID.
+ */
+ private def assertNewMetadataCreated(checkpointRoot: String): StreamMetadata
= {
+ val checkpointMetadata = new StreamingQueryCheckpointMetadata(spark,
checkpointRoot)
+ val metadata = checkpointMetadata.streamMetadata
+ assert(metadata != null)
+ assert(metadata.id != null)
+ assert(UUID.fromString(metadata.id) != null) // Should be a valid UUID
+ metadata
+ }
+
+ /**
+ * Runs e2e test for streaming query with corrupted checkpoint.
+ * @param validationEnabled if true, expects restart to fail and if false,
expects success
+ */
+ private def testE2ECorruptedCheckpoint(validationEnabled: Boolean): Unit = {
+ withTempDir { checkpointDir =>
+ withTempDir { outputDir =>
+ val inputData = MemoryStream[Int]
+ var query = inputData.toDF()
+ .writeStream
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .outputMode(OutputMode.Append())
+ .start(outputDir.getAbsolutePath)
+
+ try {
+ // Add data and process batches
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ // Stop the query
+ query.stop()
+ query = null
+
+ // Simulate corrupted checkpoint by deleting only the metadata file
+ deleteMetadataFile(checkpointDir.getAbsolutePath)
+
+ if (validationEnabled) {
+ // Should fail with validation error
+ val metadataPath = new Path(new
Path(checkpointDir.getAbsolutePath), "metadata")
+ val fs =
metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val exception = intercept[SparkRuntimeException] {
+ inputData.toDF()
+ .writeStream
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .outputMode(OutputMode.Append())
+ .start(outputDir.getAbsolutePath)
+ }
+ val qualifiedPath = fs.makeQualified(new
Path(checkpointDir.getAbsolutePath))
+ checkError(
+ exception = exception,
+ condition = "STREAMING_CHECKPOINT_MISSING_METADATA_FILE",
+ parameters = Map("checkpointLocation" -> qualifiedPath.toString)
+ )
+ } else {
+ // Should succeed - validation is disabled
+ query = inputData.toDF()
+ .writeStream
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .outputMode(OutputMode.Append())
+ .start(outputDir.getAbsolutePath)
+
+ assert(query.isActive, "Query should be active after restart")
+ inputData.addData(7, 8, 9)
+ query.processAllAvailable()
+ }
+ } finally {
+ if (query != null && query.isActive) {
+ query.stop()
+ }
+ }
+ }
+ }
+ }
+
+ test("valid case: new checkpoint with no metadata and no logs") {
+ withTempDir { dir =>
+ assertNewMetadataCreated(dir.getAbsolutePath)
+ }
+ }
+
+ test("valid case: existing checkpoint with metadata and logs") {
+ withTempDir { dir =>
+ val (checkpointMetadata1, checkpointRoot) =
+ createCheckpointWithLogs(dir, addOffsets = true, addCommits = true)
+ val originalId = checkpointMetadata1.streamMetadata.id
+
+ // Re-read checkpoint - should succeed and return the same ID
+ val checkpointMetadata2 = new StreamingQueryCheckpointMetadata(spark,
checkpointRoot)
+ val metadata2 = checkpointMetadata2.streamMetadata
+ assert(metadata2.id === originalId)
+ }
+ }
+
+ test("invalid case: missing metadata with non-empty offset log") {
+ withTempDir { dir =>
+ val (_, checkpointRoot) = createCheckpointWithLogs(dir, addOffsets =
true)
+ deleteMetadataFile(checkpointRoot)
+ assertMissingMetadataError(checkpointRoot)
+ }
+ }
+
+ test("invalid case: missing metadata with non-empty commit log") {
+ withTempDir { dir =>
+ val (_, checkpointRoot) = createCheckpointWithLogs(dir, addCommits =
true)
+ deleteMetadataFile(checkpointRoot)
+ assertMissingMetadataError(checkpointRoot)
+ }
+ }
+
+ test("invalid case: missing metadata with both offset and commit logs
non-empty") {
+ withTempDir { dir =>
+ val (_, checkpointRoot) = createCheckpointWithLogs(dir, addOffsets =
true, addCommits = true)
+ deleteMetadataFile(checkpointRoot)
+ assertMissingMetadataError(checkpointRoot)
+ }
+ }
+
+ test("valid case: missing metadata with empty logs should succeed") {
+ withTempDir { dir =>
+ val checkpointRoot = dir.getAbsolutePath
+
+ // Create checkpoint directories but don't add any data
+ val offsetLog = new OffsetSeqLog(spark, new File(dir,
"offsets").toString)
+ val commitLog = new CommitLog(spark, new File(dir, "commits").toString)
+
+ // Verify logs are empty
+ assert(offsetLog.getLatestBatchId().isEmpty)
+ assert(commitLog.getLatestBatchId().isEmpty)
+
+ // Try to create checkpoint metadata - should succeed
+ assertNewMetadataCreated(checkpointRoot)
+ }
+ }
+
+ test("sparkConf: validation is skipped when flag is disabled") {
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_VERIFY_METADATA_EXISTS.key ->
"false") {
+ // Verify flag is disabled
+
assert(!spark.conf.get(SQLConf.STREAMING_CHECKPOINT_VERIFY_METADATA_EXISTS))
+
+ withTempDir { dir =>
+ val (_, checkpointRoot) = createCheckpointWithLogs(dir, addOffsets =
true)
+ deleteMetadataFile(checkpointRoot)
+
+ // Should succeed and create new metadata with new UUID (validation is
skipped)
+ assertNewMetadataCreated(checkpointRoot)
+ }
+ }
+ }
+
+ test("e2e: streaming query fails to restart when checkpoint metadata is
corrupted") {
+ testE2ECorruptedCheckpoint(validationEnabled = true)
+ }
+
+ test("e2e: streaming query restarts successfully when flag is disabled") {
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_VERIFY_METADATA_EXISTS.key ->
"false") {
+ testE2ECorruptedCheckpoint(validationEnabled = false)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]