This is an automated email from the ASF dual-hosted git repository.
anishshri-db pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f785e5797a47 [SPARK-56968][SS] Force offset log VERSION_2 when
streaming source evolution is enabled
f785e5797a47 is described below
commit f785e5797a471c39f6a50b68a59763d264310e50
Author: ericm-db <[email protected]>
AuthorDate: Fri May 29 14:37:15 2026 -0700
[SPARK-56968][SS] Force offset log VERSION_2 when streaming source
evolution is enabled
### What changes were proposed in this pull request?
When the streaming source evolution flag
(`spark.sql.streaming.queryEvolution.enableSourceEvolution`) is set to `true`,
force the offset log format to `VERSION_2` for new streaming queries.
In `MicroBatchExecution.initializeExecution`, the offset log format version
selection now takes `max(STREAMING_OFFSET_LOG_FORMAT_VERSION,
minRequiredVersion)`, where `minRequiredVersion` is `VERSION_2` when source
evolution is enabled and `VERSION_1` otherwise. Existing queries continue to
use whatever version is already written in their offset log (read from
`latestStartedBatch`), so this only affects new queries.
The `testWithSourceEvolution` helper in `StreamingSourceEvolutionSuite` was
updated to no longer set the offset log version explicitly, since it is now
selected automatically.
### Why are the changes needed?
Streaming source evolution relies on the `OffsetMap` (sourceId -> offset)
format, which is only available in offset log `VERSION_2`. Previously, users
had to remember to set `spark.sql.streaming.offsetLog.formatVersion=2`
alongside enabling source evolution; otherwise the format would default to
`VERSION_1` (sequence-based) and the named-source tracking required by source
evolution would not function properly. Coupling the two configs eliminates a
footgun.
### Does this PR introduce _any_ user-facing change?
No.
The change only affects new streaming queries that explicitly enable the
internal `spark.sql.streaming.queryEvolution.enableSourceEvolution` flag. For
such queries, the offset log will now use `VERSION_2` automatically. Users who
manually set the offset log version remain in control: the final version is
`max(configuredVersion, minRequiredVersion)`, so a user-configured `VERSION_2`
keeps working unchanged.
### How was this patch tested?
- Added `offset log uses VERSION_2 when source evolution is enabled` test
in `StreamingSourceEvolutionSuite`.
- Existing `StreamingSourceEvolutionSuite` tests pass after dropping the
explicit offset log version from `testWithSourceEvolution` (19/19).
- `OffsetSeqLogSuite` continues to pass (19/19).
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
Closes #56015 from ericm-db/spark-source-evolution-offset-log-v2.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 +
.../spark/sql/errors/QueryCompilationErrors.scala | 6 +
.../streaming/ClientStreamingQuerySuite.scala | 8 +-
.../checkpointing/CheckpointVersionManager.scala | 151 +++++++++++++++++++++
.../streaming/runtime/MicroBatchExecution.scala | 21 +--
.../execution/streaming/OffsetSeqLogSuite.scala | 47 +++++++
.../test/StreamingSourceEvolutionSuite.scala | 35 ++++-
7 files changed, 255 insertions(+), 18 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 4a2591d548ed..8d0a7d20eb14 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7087,6 +7087,11 @@
"Streaming query evolution error:"
],
"subClass" : {
+ "CANNOT_ENABLE_ON_EXISTING_CHECKPOINT" : {
+ "message" : [
+ "Cannot enable streaming source evolution on a checkpoint that was
created without it. The existing checkpoint uses offset log format version
<existingVersion>, which does not support the named source tracking required by
streaming source evolution. To use source evolution, start the query with a
fresh checkpoint."
+ ]
+ },
"DUPLICATE_SOURCE_NAMES" : {
"message" : [
"Duplicate streaming source names detected: <names>. Each streaming
source must have a unique name."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index d5a9cc723bc3..8c53300df227 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2542,6 +2542,12 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("names" -> duplicateNames.mkString(", ")))
}
+ def cannotEnableSourceEvolutionOnExistingCheckpointError(existingVersion:
Int): Throwable = {
+ new AnalysisException(
+ errorClass =
"STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
+ messageParameters = Map("existingVersion" -> existingVersion.toString))
+ }
+
def columnNotFoundInExistingColumnsError(
columnType: String, columnName: String, validColumnNames: Seq[String]):
Throwable = {
new AnalysisException(
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
index 057e2fdc4775..d9d2498393c6 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
@@ -54,13 +54,13 @@ class ClientStreamingQuerySuite extends QueryTest with
RemoteSparkSession with L
"streaming")
/**
- * Helper method to run tests with source evolution configs enabled.
+ * Helper method to run tests with source evolution enabled. Enabling source
evolution
+ * automatically forces offset log format V2 (OffsetMap) for new queries,
since named sources
+ * require it.
*/
private def testWithSourceEvolution(testName: String)(testFun: => Unit):
Unit = {
test(testName) {
- withSQLConf(
- "spark.sql.streaming.queryEvolution.enableSourceEvolution" -> "true",
- "spark.sql.streaming.offsetLog.formatVersion" -> "2") {
+ withSQLConf("spark.sql.streaming.queryEvolution.enableSourceEvolution"
-> "true") {
testFun
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
new file mode 100644
index 000000000000..4f41f7cc46cf
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.checkpointing
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Case class for managing the internal versioning for the streaming
checkpoint. Versions are
+ * tracked per component (currently just the offset log; other components are
managed elsewhere via
+ * dedicated configs).
+ */
+case class StreamingCheckpointVersion(offsetLogVersion: Int) {
+ override def toString: String = {
+ s"StreamingCheckpointVersion(offsetLogVersion: $offsetLogVersion)"
+ }
+}
+
+sealed trait CheckpointLogType
+case object OffsetLogType extends CheckpointLogType
+
+/**
+ * The `CheckpointVersionManager` is responsible for managing the versioning
of the streaming
+ * checkpoint. It determines which version of each system-managed log format
to use when starting
+ * a streaming query, and validates that the requested feature set is
compatible with the existing
+ * checkpoint when restarting.
+ *
+ * Writer versions are typically used only while starting a new streaming
query and are not
+ * intended to be exposed directly to users; once set, they are not intended
to change for the
+ * lifetime of the query.
+ */
+object CheckpointVersionManager extends Logging {
+
+ // Streaming checkpoint writer version 1: base version supporting
DataFrame-based streaming
+ // queries across the standard trigger types.
+ private val CHECKPOINT_VERSION_V1 =
StreamingCheckpointVersion(OffsetSeqLog.VERSION_1)
+
+ // The current version of the streaming checkpoint. To bump this, define a
new
+ // `StreamingCheckpointVersion` instance with the new per-component version
numbers and update
+ // this constant.
+ private val CURRENT_VERSION = CHECKPOINT_VERSION_V1
+
+ def getCurrentVersion(): StreamingCheckpointVersion = CURRENT_VERSION
+
+ /**
+ * Returns the offset log format version to use for a new streaming query.
We take the max of:
+ * - the current default version
+ * - the minimum required version implied by enabled features (e.g.
streaming source evolution
+ * requires [[OffsetSeqLog.VERSION_2]] for OffsetMap-based named source
tracking)
+ * - the configured version (via
[[SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION]])
+ *
+ * @param sparkSessionForStream the cloned `SparkSession` for the streaming
query
+ */
+ private def getOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
+ val currentDefaultVersion = getCurrentVersion().offsetLogVersion
+ val minRequiredVersion =
getMinRequiredOffsetLogVersion(sparkSessionForStream)
+ val configuredVersion =
sparkSessionForStream.sessionState.conf.streamingOffsetLogFormatVersion
+ val result = List[Int](currentDefaultVersion, minRequiredVersion,
configuredVersion).max
+ logInfo(s"Retrieved offset log writer version=$result")
+ result
+ }
+
+ /**
+ * Minimum offset log format version required by the features enabled on
this session. Streaming
+ * source evolution relies on the OffsetMap (sourceId -> offset) format,
which is only available
+ * in [[OffsetSeqLog.VERSION_2]].
+ */
+ private def getMinRequiredOffsetLogVersion(sparkSessionForStream:
SparkSession): Int = {
+ if
(sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
+ OffsetSeqLog.VERSION_2
+ } else {
+ OffsetSeqLog.VERSION_1
+ }
+ }
+
+ /**
+ * Set the SparkSession configurations for the offset log format version.
+ */
+ private def setSparkSessionConfigsForOffsetLog(
+ sparkSessionForStream: SparkSession,
+ offsetLogFormatVersion: Int): Unit = {
+ sparkSessionForStream.conf.set(
+ SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
+ }
+
+ /**
+ * Returns the format version for the given log type. Reads any
feature-driven minimums from the
+ * `sparkSessionForStream` config, which must be initialized before calling.
+ */
+ def getFormatVersionFromSession(
+ sparkSessionForStream: SparkSession,
+ logType: CheckpointLogType): Int = {
+ logType match {
+ case OffsetLogType => getOffsetLogVersion(sparkSessionForStream)
+ }
+ }
+
+ /**
+ * Determines the offset log format version for this query run. For existing
queries, reads from
+ * the last written offset log entry. For new queries, delegates to the
session config (honoring
+ * any feature-driven minimums).
+ *
+ * Also validates that the session config is compatible with the existing
checkpoint. Currently,
+ * enabling streaming source evolution on a checkpoint whose offset log is
below VERSION_2 is
+ * rejected, since the OffsetMap-based named source tracking required by
source evolution is not
+ * available in earlier versions.
+ */
+ def resolveOffsetLogVersion(
+ sparkSessionForStream: SparkSession,
+ latestStartedBatch: Option[(Long, OffsetSeqBase)]): Int = {
+ latestStartedBatch match {
+ case Some((_, offsetSeq)) =>
+ val existingVersion = offsetSeq.version
+ if (existingVersion < OffsetSeqLog.VERSION_2 &&
+
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
+ throw
QueryCompilationErrors.cannotEnableSourceEvolutionOnExistingCheckpointError(
+ existingVersion)
+ }
+ existingVersion
+ case None =>
+ getFormatVersionFromSession(sparkSessionForStream, OffsetLogType)
+ }
+ }
+
+ def setFormatVersion(
+ sparkSessionForStream: SparkSession,
+ logType: CheckpointLogType,
+ version: Int): Unit = {
+ logType match {
+ case OffsetLogType =>
+ setSparkSessionConfigsForOffsetLog(sparkSessionForStream, version)
+ }
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index 84f0373ca5d4..68914913a00e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan,
SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
RealTimeStreamScanExec, StreamingDataSourceV2Relation,
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress,
WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset,
OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger,
Sink, Source, StreamingQueryPlanTraverseHelper}
-import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata,
OffsetSeqMetadataV2}
+import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
CheckpointVersionManager, CommitMetadata, OffsetLogType, OffsetSeqBase,
OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink,
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
@@ -505,20 +505,21 @@ class MicroBatchExecution(
sourceIdMap
)
- // Read the offset log format version from the last written offset log
entry. If no entries
- // are found, use the set/default value from the config.
- val offsetLogFormatVersion = if (latestStartedBatch.isDefined) {
- latestStartedBatch.get._2.version
- } else {
+ // For existing queries, the offset log format version is read from the
last written offset log
+ // entry; for new queries, it is resolved from the session config
(honoring any feature-driven
+ // minimums). Restarting with an incompatible feature set on an existing
checkpoint is rejected
+ // inside the manager.
+ if (latestStartedBatch.isEmpty) {
// If no offset log entries are found, assert that the query does not
have any committed
// batches to be extra safe.
assert(lastCommittedBatchId == -1L)
-
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
}
+ val offsetLogFormatVersion =
CheckpointVersionManager.resolveOffsetLogVersion(
+ sparkSessionForStream, latestStartedBatch)
- // Set the offset log format version in the sparkSessionForStream conf
- sparkSessionForStream.conf.set(
- SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
+ // Persist the resolved offset log format version on the streaming session.
+ CheckpointVersionManager.setFormatVersion(
+ sparkSessionForStream, OffsetLogType, offsetLogFormatVersion)
val execCtx = new MicroBatchExecutionContext(id, runId, name,
triggerClock, sources, sink,
progressReporter, -1, sparkSession,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 36bf5e2f8313..248bd0250733 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -21,10 +21,12 @@ import java.io.File
import org.scalatest.Tag
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap,
OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset,
MemoryStream, SerializedOffset}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
@@ -368,6 +370,51 @@ class OffsetSeqLogSuite extends SharedSparkSession {
}
}
+ test("enabling source evolution on an existing V1 checkpoint is rejected") {
+ withTempDir { checkpointDir =>
+ withTempDir { outputDir =>
+ val inputData = MemoryStream[Int]
+
+ // Start query without source evolution, writing V1 offset log entries.
+ val query1 = inputData.toDF()
+ .writeStream
+ .format("parquet")
+ .option("path", outputDir.getAbsolutePath)
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .start()
+ inputData.addData(1, 2)
+ query1.processAllAvailable()
+ query1.stop()
+
+ val offsetLog = new OffsetSeqLog(spark,
s"${checkpointDir.getAbsolutePath}/offsets")
+ val initialBatch = offsetLog.getLatest()
+ assert(initialBatch.isDefined)
+ assert(initialBatch.get._2.version === 1)
+ assert(initialBatch.get._2.isInstanceOf[OffsetSeq])
+
+ // Restart with the source evolution session flag enabled. The
existing V1 checkpoint does
+ // not support OffsetMap-based named source tracking, so the query
must fail loudly rather
+ // than silently downgrading the user's session config.
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
+ val query2 = inputData.toDF()
+ .writeStream
+ .format("parquet")
+ .option("path", outputDir.getAbsolutePath)
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .start()
+ val ex = intercept[StreamingQueryException] {
+ inputData.addData(3, 4)
+ query2.processAllAvailable()
+ }
+ checkError(
+ exception = ex.cause.asInstanceOf[AnalysisException],
+ condition =
"STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
+ parameters = Map("existingVersion" -> "1"))
+ }
+ }
+ }
+ }
+
test("SPARK-55131: offset log records defaults to merge operator version 2")
{
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0,
batchTimestampMs = 0,
spark.conf)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
index 0ee219171a45..cdf8cb76d8e3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
@@ -25,6 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.Tag
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap,
OffsetSeqLog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.Trigger._
@@ -211,6 +212,33 @@ class StreamingSourceEvolutionSuite extends StreamTest {
// Metadata Path Tests
// =======================
+ testWithSourceEvolution("offset log uses VERSION_2 when source evolution is
enabled") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val q = df1.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ val offsetLog = new OffsetSeqLog(spark, s"$checkpointLocation/offsets")
+ val latestBatch = offsetLog.getLatest()
+ assert(latestBatch.isDefined, "Offset log should have at least one entry")
+ val (_, offsetSeq) = latestBatch.get
+ assert(offsetSeq.isInstanceOf[OffsetMap],
+ s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
+ assert(offsetSeq.version === 2, s"Expected version 2 but got
${offsetSeq.version}")
+ }
+
testWithSourceEvolution("named sources - metadata path uses source name") {
LastOptions.clear()
@@ -506,13 +534,12 @@ class StreamingSourceEvolutionSuite extends StreamTest {
/**
* Helper method to run tests with source evolution enabled.
- * Sets offset log format to V2 (OffsetMap) since named sources require it.
+ * Enabling source evolution automatically forces offset log format V2
(OffsetMap) for new
+ * queries, since named sources require it.
*/
def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: =>
Any): Unit = {
test(testName, testTags: _*) {
- withSQLConf(
- SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
- SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
testBody
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]