This is an automated email from the ASF dual-hosted git repository.

anishshri-db pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f785e5797a47 [SPARK-56968][SS] Force offset log VERSION_2 when 
streaming source evolution is enabled
f785e5797a47 is described below

commit f785e5797a471c39f6a50b68a59763d264310e50
Author: ericm-db <[email protected]>
AuthorDate: Fri May 29 14:37:15 2026 -0700

    [SPARK-56968][SS] Force offset log VERSION_2 when streaming source 
evolution is enabled
    
    ### What changes were proposed in this pull request?
    
    When the streaming source evolution flag 
(`spark.sql.streaming.queryEvolution.enableSourceEvolution`) is set to `true`, 
force the offset log format to `VERSION_2` for new streaming queries.
    
    In `MicroBatchExecution.initializeExecution`, the offset log format version 
selection now takes `max(STREAMING_OFFSET_LOG_FORMAT_VERSION, 
minRequiredVersion)`, where `minRequiredVersion` is `VERSION_2` when source 
evolution is enabled and `VERSION_1` otherwise. Existing queries continue to 
use whatever version is already written in their offset log (read from 
`latestStartedBatch`), so this only affects new queries.
    
    The `testWithSourceEvolution` helper in `StreamingSourceEvolutionSuite` was 
updated to no longer set the offset log version explicitly, since it is now 
selected automatically.
    
    ### Why are the changes needed?
    
    Streaming source evolution relies on the `OffsetMap` (sourceId -> offset) 
format, which is only available in offset log `VERSION_2`. Previously, users 
had to remember to set `spark.sql.streaming.offsetLog.formatVersion=2` 
alongside enabling source evolution; otherwise the format would default to 
`VERSION_1` (sequence-based) and the named-source tracking required by source 
evolution would not function properly. Coupling the two configs eliminates a 
footgun.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    The change only affects new streaming queries that explicitly enable the 
internal `spark.sql.streaming.queryEvolution.enableSourceEvolution` flag. For 
such queries, the offset log will now use `VERSION_2` automatically. Users who 
manually set the offset log version remain in control: the final version is 
`max(configuredVersion, minRequiredVersion)`, so a user-configured `VERSION_2` 
keeps working unchanged.
    
    ### How was this patch tested?
    
    - Added `offset log uses VERSION_2 when source evolution is enabled` test 
in `StreamingSourceEvolutionSuite`.
    - Existing `StreamingSourceEvolutionSuite` tests pass after dropping the 
explicit offset log version from `testWithSourceEvolution` (19/19).
    - `OffsetSeqLogSuite` continues to pass (19/19).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    Closes #56015 from ericm-db/spark-source-evolution-offset-log-v2.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   5 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   6 +
 .../streaming/ClientStreamingQuerySuite.scala      |   8 +-
 .../checkpointing/CheckpointVersionManager.scala   | 151 +++++++++++++++++++++
 .../streaming/runtime/MicroBatchExecution.scala    |  21 +--
 .../execution/streaming/OffsetSeqLogSuite.scala    |  47 +++++++
 .../test/StreamingSourceEvolutionSuite.scala       |  35 ++++-
 7 files changed, 255 insertions(+), 18 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 4a2591d548ed..8d0a7d20eb14 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7087,6 +7087,11 @@
       "Streaming query evolution error:"
     ],
     "subClass" : {
+      "CANNOT_ENABLE_ON_EXISTING_CHECKPOINT" : {
+        "message" : [
+          "Cannot enable streaming source evolution on a checkpoint that was 
created without it. The existing checkpoint uses offset log format version 
<existingVersion>, which does not support the named source tracking required by 
streaming source evolution. To use source evolution, start the query with a 
fresh checkpoint."
+        ]
+      },
       "DUPLICATE_SOURCE_NAMES" : {
         "message" : [
           "Duplicate streaming source names detected: <names>. Each streaming 
source must have a unique name."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index d5a9cc723bc3..8c53300df227 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2542,6 +2542,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("names" -> duplicateNames.mkString(", ")))
   }
 
+  def cannotEnableSourceEvolutionOnExistingCheckpointError(existingVersion: 
Int): Throwable = {
+    new AnalysisException(
+      errorClass = 
"STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
+      messageParameters = Map("existingVersion" -> existingVersion.toString))
+  }
+
   def columnNotFoundInExistingColumnsError(
       columnType: String, columnName: String, validColumnNames: Seq[String]): 
Throwable = {
     new AnalysisException(
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
index 057e2fdc4775..d9d2498393c6 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
@@ -54,13 +54,13 @@ class ClientStreamingQuerySuite extends QueryTest with 
RemoteSparkSession with L
       "streaming")
 
   /**
-   * Helper method to run tests with source evolution configs enabled.
+   * Helper method to run tests with source evolution enabled. Enabling source 
evolution
+   * automatically forces offset log format V2 (OffsetMap) for new queries, 
since named sources
+   * require it.
    */
   private def testWithSourceEvolution(testName: String)(testFun: => Unit): 
Unit = {
     test(testName) {
-      withSQLConf(
-        "spark.sql.streaming.queryEvolution.enableSourceEvolution" -> "true",
-        "spark.sql.streaming.offsetLog.formatVersion" -> "2") {
+      withSQLConf("spark.sql.streaming.queryEvolution.enableSourceEvolution" 
-> "true") {
         testFun
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
new file mode 100644
index 000000000000..4f41f7cc46cf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointVersionManager.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.checkpointing
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Case class for managing the internal versioning for the streaming 
checkpoint. Versions are
+ * tracked per component (currently just the offset log; other components are 
managed elsewhere via
+ * dedicated configs).
+ */
+case class StreamingCheckpointVersion(offsetLogVersion: Int) {
+  override def toString: String = {
+    s"StreamingCheckpointVersion(offsetLogVersion: $offsetLogVersion)"
+  }
+}
+
+sealed trait CheckpointLogType
+case object OffsetLogType extends CheckpointLogType
+
+/**
+ * The `CheckpointVersionManager` is responsible for managing the versioning 
of the streaming
+ * checkpoint. It determines which version of each system-managed log format 
to use when starting
+ * a streaming query, and validates that the requested feature set is 
compatible with the existing
+ * checkpoint when restarting.
+ *
+ * Writer versions are typically used only while starting a new streaming 
query and are not
+ * intended to be exposed directly to users; once set, they are not intended 
to change for the
+ * lifetime of the query.
+ */
+object CheckpointVersionManager extends Logging {
+
+  // Streaming checkpoint writer version 1: base version supporting 
DataFrame-based streaming
+  // queries across the standard trigger types.
+  private val CHECKPOINT_VERSION_V1 = 
StreamingCheckpointVersion(OffsetSeqLog.VERSION_1)
+
+  // The current version of the streaming checkpoint. To bump this, define a 
new
+  // `StreamingCheckpointVersion` instance with the new per-component version 
numbers and update
+  // this constant.
+  private val CURRENT_VERSION = CHECKPOINT_VERSION_V1
+
+  def getCurrentVersion(): StreamingCheckpointVersion = CURRENT_VERSION
+
+  /**
+   * Returns the offset log format version to use for a new streaming query. 
We take the max of:
+   *  - the current default version
+   *  - the minimum required version implied by enabled features (e.g. 
streaming source evolution
+   *    requires [[OffsetSeqLog.VERSION_2]] for OffsetMap-based named source 
tracking)
+   *  - the configured version (via 
[[SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION]])
+   *
+   * @param sparkSessionForStream the cloned `SparkSession` for the streaming 
query
+   */
+  private def getOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
+    val currentDefaultVersion = getCurrentVersion().offsetLogVersion
+    val minRequiredVersion = 
getMinRequiredOffsetLogVersion(sparkSessionForStream)
+    val configuredVersion = 
sparkSessionForStream.sessionState.conf.streamingOffsetLogFormatVersion
+    val result = List[Int](currentDefaultVersion, minRequiredVersion, 
configuredVersion).max
+    logInfo(s"Retrieved offset log writer version=$result")
+    result
+  }
+
+  /**
+   * Minimum offset log format version required by the features enabled on 
this session. Streaming
+   * source evolution relies on the OffsetMap (sourceId -> offset) format, 
which is only available
+   * in [[OffsetSeqLog.VERSION_2]].
+   */
+  private def getMinRequiredOffsetLogVersion(sparkSessionForStream: 
SparkSession): Int = {
+    if 
(sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
+      OffsetSeqLog.VERSION_2
+    } else {
+      OffsetSeqLog.VERSION_1
+    }
+  }
+
+  /**
+   * Set the SparkSession configurations for the offset log format version.
+   */
+  private def setSparkSessionConfigsForOffsetLog(
+      sparkSessionForStream: SparkSession,
+      offsetLogFormatVersion: Int): Unit = {
+    sparkSessionForStream.conf.set(
+      SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
+  }
+
+  /**
+   * Returns the format version for the given log type. Reads any 
feature-driven minimums from the
+   * `sparkSessionForStream` config, which must be initialized before calling.
+   */
+  def getFormatVersionFromSession(
+      sparkSessionForStream: SparkSession,
+      logType: CheckpointLogType): Int = {
+    logType match {
+      case OffsetLogType => getOffsetLogVersion(sparkSessionForStream)
+    }
+  }
+
+  /**
+   * Determines the offset log format version for this query run. For existing 
queries, reads from
+   * the last written offset log entry. For new queries, delegates to the 
session config (honoring
+   * any feature-driven minimums).
+   *
+   * Also validates that the session config is compatible with the existing 
checkpoint. Currently,
+   * enabling streaming source evolution on a checkpoint whose offset log is 
below VERSION_2 is
+   * rejected, since the OffsetMap-based named source tracking required by 
source evolution is not
+   * available in earlier versions.
+   */
+  def resolveOffsetLogVersion(
+      sparkSessionForStream: SparkSession,
+      latestStartedBatch: Option[(Long, OffsetSeqBase)]): Int = {
+    latestStartedBatch match {
+      case Some((_, offsetSeq)) =>
+        val existingVersion = offsetSeq.version
+        if (existingVersion < OffsetSeqLog.VERSION_2 &&
+            
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
+          throw 
QueryCompilationErrors.cannotEnableSourceEvolutionOnExistingCheckpointError(
+            existingVersion)
+        }
+        existingVersion
+      case None =>
+        getFormatVersionFromSession(sparkSessionForStream, OffsetLogType)
+    }
+  }
+
+  def setFormatVersion(
+      sparkSessionForStream: SparkSession,
+      logType: CheckpointLogType,
+      version: Int): Unit = {
+    logType match {
+      case OffsetLogType =>
+        setSparkSessionConfigsForOffsetLog(sparkSessionForStream, version)
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index 84f0373ca5d4..68914913a00e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
RealTimeStreamScanExec, StreamingDataSourceV2Relation, 
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, 
WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, 
OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, 
Sink, Source, StreamingQueryPlanTraverseHelper}
-import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, 
OffsetSeqMetadataV2}
+import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
CheckpointVersionManager, CommitMetadata, OffsetLogType, OffsetSeqBase, 
OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
 StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
 import 
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS, DIR_NAME_STATE}
 import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, 
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
@@ -505,20 +505,21 @@ class MicroBatchExecution(
       sourceIdMap
     )
 
-    // Read the offset log format version from the last written offset log 
entry. If no entries
-    // are found, use the set/default value from the config.
-    val offsetLogFormatVersion = if (latestStartedBatch.isDefined) {
-      latestStartedBatch.get._2.version
-    } else {
+    // For existing queries, the offset log format version is read from the 
last written offset log
+    // entry; for new queries, it is resolved from the session config 
(honoring any feature-driven
+    // minimums). Restarting with an incompatible feature set on an existing 
checkpoint is rejected
+    // inside the manager.
+    if (latestStartedBatch.isEmpty) {
       // If no offset log entries are found, assert that the query does not 
have any committed
       // batches to be extra safe.
       assert(lastCommittedBatchId == -1L)
-      
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
     }
+    val offsetLogFormatVersion = 
CheckpointVersionManager.resolveOffsetLogVersion(
+      sparkSessionForStream, latestStartedBatch)
 
-    // Set the offset log format version in the sparkSessionForStream conf
-    sparkSessionForStream.conf.set(
-      SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
+    // Persist the resolved offset log format version on the streaming session.
+    CheckpointVersionManager.setFormatVersion(
+      sparkSessionForStream, OffsetLogType, offsetLogFormatVersion)
 
     val execCtx = new MicroBatchExecutionContext(id, runId, name, 
triggerClock, sources, sink,
       progressReporter, -1, sparkSession,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 36bf5e2f8313..248bd0250733 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -21,10 +21,12 @@ import java.io.File
 
 import org.scalatest.Tag
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, 
OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata}
 import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, 
MemoryStream, SerializedOffset}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamingQueryException
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.Utils
 
@@ -368,6 +370,51 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     }
   }
 
+  test("enabling source evolution on an existing V1 checkpoint is rejected") {
+    withTempDir { checkpointDir =>
+      withTempDir { outputDir =>
+        val inputData = MemoryStream[Int]
+
+        // Start query without source evolution, writing V1 offset log entries.
+        val query1 = inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("path", outputDir.getAbsolutePath)
+          .option("checkpointLocation", checkpointDir.getAbsolutePath)
+          .start()
+        inputData.addData(1, 2)
+        query1.processAllAvailable()
+        query1.stop()
+
+        val offsetLog = new OffsetSeqLog(spark, 
s"${checkpointDir.getAbsolutePath}/offsets")
+        val initialBatch = offsetLog.getLatest()
+        assert(initialBatch.isDefined)
+        assert(initialBatch.get._2.version === 1)
+        assert(initialBatch.get._2.isInstanceOf[OffsetSeq])
+
+        // Restart with the source evolution session flag enabled. The 
existing V1 checkpoint does
+        // not support OffsetMap-based named source tracking, so the query 
must fail loudly rather
+        // than silently downgrading the user's session config.
+        withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
+          val query2 = inputData.toDF()
+            .writeStream
+            .format("parquet")
+            .option("path", outputDir.getAbsolutePath)
+            .option("checkpointLocation", checkpointDir.getAbsolutePath)
+            .start()
+          val ex = intercept[StreamingQueryException] {
+            inputData.addData(3, 4)
+            query2.processAllAvailable()
+          }
+          checkError(
+            exception = ex.cause.asInstanceOf[AnalysisException],
+            condition = 
"STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
+            parameters = Map("existingVersion" -> "1"))
+        }
+      }
+    }
+  }
+
   test("SPARK-55131: offset log records defaults to merge operator version 2") 
{
     val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, 
batchTimestampMs = 0,
       spark.conf)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
index 0ee219171a45..cdf8cb76d8e3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
@@ -25,6 +25,7 @@ import org.mockito.Mockito._
 import org.scalatest.Tag
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, 
OffsetSeqLog}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.streaming.Trigger._
@@ -211,6 +212,33 @@ class StreamingSourceEvolutionSuite extends StreamTest {
   // Metadata Path Tests
   // =======================
 
+  testWithSourceEvolution("offset log uses VERSION_2 when source evolution is 
enabled") {
+    LastOptions.clear()
+
+    val checkpointLocation = new Path(newMetadataDir)
+
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .name("source1")
+      .load()
+
+    val q = df1.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation.toString)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.processAllAvailable()
+    q.stop()
+
+    val offsetLog = new OffsetSeqLog(spark, s"$checkpointLocation/offsets")
+    val latestBatch = offsetLog.getLatest()
+    assert(latestBatch.isDefined, "Offset log should have at least one entry")
+    val (_, offsetSeq) = latestBatch.get
+    assert(offsetSeq.isInstanceOf[OffsetMap],
+      s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
+    assert(offsetSeq.version === 2, s"Expected version 2 but got 
${offsetSeq.version}")
+  }
+
   testWithSourceEvolution("named sources - metadata path uses source name") {
     LastOptions.clear()
 
@@ -506,13 +534,12 @@ class StreamingSourceEvolutionSuite extends StreamTest {
 
   /**
    * Helper method to run tests with source evolution enabled.
-   * Sets offset log format to V2 (OffsetMap) since named sources require it.
+   * Enabling source evolution automatically forces offset log format V2 
(OffsetMap) for new
+   * queries, since named sources require it.
    */
   def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => 
Any): Unit = {
     test(testName, testTags: _*) {
-      withSQLConf(
-        SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
-        SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+      withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
         testBody
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to