This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3ba36ec11b92 [SPARK-51187][SQL][SS][3.5] Implement the graceful
deprecation of incorrect config introduced in SPARK-49699
3ba36ec11b92 is described below
commit 3ba36ec11b921729a11a98d397072721e75ab553
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sat Feb 22 21:36:21 2025 -0800
[SPARK-51187][SQL][SS][3.5] Implement the graceful deprecation of incorrect
config introduced in SPARK-49699
### What changes were proposed in this pull request?
This PR proposes to implement the graceful deprecation of incorrect config
introduced in SPARK-49699.
SPARK-49699 was included in Spark 3.5.4, hence we can't simply rename to
fix the issue.
Also, since the incorrect config is logged in offset log in streaming
query, the fix isn't just easy like adding withAlternative and done. We need to
manually handle the case where offset log contains the incorrect config, and
set the value of incorrect config in the offset log into the new config. Once a
single microbatch has planned after the restart (hence the above logic is
applied), offset log will contain the "new" config and it will no longer refer
to the incorrect config.
That said, we can remove the incorrect config in the Spark version which we
are confident that there will be no case users will upgrade from Spark 3.5.4 to
that version.
### Why are the changes needed?
We released an incorrect config and we want to rename it properly. While
renaming, we don't also want to have any breakage on the existing streaming
query.
### Does this PR introduce _any_ user-facing change?
No. That is what this PR is aiming for.
### How was this patch tested?
New UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49985 from HeartSaVioR/SPARK-51187-3.5.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 7 ++-
.../spark/sql/execution/streaming/OffsetSeq.scala | 11 ++++
.../checkpoint-version-3.5.4/commits/0 | 2 +
.../checkpoint-version-3.5.4/commits/1 | 2 +
.../checkpoint-version-3.5.4/metadata | 1 +
.../checkpoint-version-3.5.4/offsets/0 | 3 +
.../checkpoint-version-3.5.4/offsets/1 | 3 +
.../spark/sql/streaming/StreamingQuerySuite.scala | 69 ++++++++++++++++++++++
8 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6f2f0088fccd..e6f8e11b0d50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3421,11 +3421,12 @@ object SQLConf {
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
val PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN =
-
buildConf("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
+ buildConf("spark.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
.internal()
.doc("Allow PruneFilters to remove streaming subplans when we encounter
a false filter. " +
"This flag is to restore prior buggy behavior for broken pipelines.")
.version("4.0.0")
+
.withAlternative("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
.booleanConf
.createWithDefault(false)
@@ -4480,7 +4481,9 @@ object SQLConf {
DeprecatedConfig(LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key, "3.2",
"""Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader`
instead."""),
DeprecatedConfig(COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "3.2",
- s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead.")
+ s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead."),
+
DeprecatedConfig(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.alternatives.head,
"3.5.5",
+ s"Use '${PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key}' instead.")
)
Map(configs.map { cfg => cfg.key -> cfg } : _*)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index cea7ec432aad..8762a25453a0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -170,5 +170,16 @@ object OffsetSeqMetadata extends Logging {
}
}
}
+
+ // SPARK-51187: This incorrect config is not added in the
relevantSQLConfs, but the
+ // metadata in the offset log may have this if the batch ran from Spark
3.5.4.
+ // We need to pick the value from the metadata and set it in the new
config.
+ // This also leads the further batches to have a correct config in the
offset log.
+
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
match {
+ case Some(value) =>
+ sessionConf.set(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)
+
+ case _ =>
+ }
}
}
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/1
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/1
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/metadata
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/metadata
new file mode 100644
index 000000000000..1aa693bb9b3c
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/metadata
@@ -0,0 +1 @@
+{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/0
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/0
new file mode 100644
index 000000000000..cb55941d52a1
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.stream
[...]
+0
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/1
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/1
new file mode 100644
index 000000000000..8ea13ab004ed
--- /dev/null
+++
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.5.4/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.stream
[...]
+1
\ No newline at end of file
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8565056cda6f..b5cf13a9c121 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -1363,6 +1363,75 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
)
}
+ test("SPARK-51187 validate that the incorrect config introduced in
SPARK-49699 still takes " +
+ "effect when restarting from Spark 3.5.4") {
+ // Spark 3.5.4 is the only release we accidentally introduced the
incorrect config.
+ // We just need to confirm that current Spark version will apply the fix
of SPARK-49699 when
+ // the streaming query started from Spark 3.5.4. We should consistently
apply the fix, instead
+ // of "on and off", because that may expose more possibility to break.
+
+ val problematicConfName =
"spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"
+
+ withTempDir { dir =>
+ val input =
getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
+ assert(input != null, "cannot find test resource")
+ val inputDir = new File(input.toURI)
+
+ // Copy test files to tempDir so that we won't modify the original data.
+ FileUtils.copyDirectory(inputDir, dir)
+
+ // Below is the code we extract checkpoint from Spark 3.5.4. We need to
make sure the offset
+ // advancement continues from the last run.
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+
+ inputData.addData(1, 2, 3, 4)
+ inputData.addData(5, 6, 7, 8)
+
+ testStream(df)(
+ StartStream(checkpointLocation = dir.getCanonicalPath),
+ AddData(inputData, 9, 10, 11, 12),
+ ProcessAllAvailable(),
+ AssertOnQuery { q =>
+ val confValue = q.lastExecution.sparkSession.conf.get(
+ SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
+ assert(confValue === false,
+ "The value for the incorrect config in offset metadata should be
respected as the " +
+ "value of the fixed config")
+
+ val offsetLog = new OffsetSeqLog(spark, new File(dir,
"offsets").getCanonicalPath)
+ def checkConfigFromMetadata(batchId: Long, expectCorrectConfig:
Boolean): Unit = {
+ val offsetLogForBatch = offsetLog.get(batchId).get
+ val confInMetadata = offsetLogForBatch.metadata.get.conf
+ if (expectCorrectConfig) {
+
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key)
===
+ Some("false"),
+ "The new offset log should have the fixed config instead of
the incorrect one."
+ )
+ assert(!confInMetadata.contains(problematicConfName),
+ "The new offset log should not have the incorrect config.")
+ } else {
+ assert(
+ confInMetadata.get(problematicConfName) === Some("false"),
+ "The offset log in test resource should have the incorrect
config to test properly."
+ )
+ assert(
+
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
+ "The offset log in test resource should not have the fixed
config."
+ )
+ }
+ }
+
+ assert(offsetLog.getLatestBatchId() === Some(2))
+ checkConfigFromMetadata(0, expectCorrectConfig = false)
+ checkConfigFromMetadata(1, expectCorrectConfig = false)
+ checkConfigFromMetadata(2, expectCorrectConfig = true)
+ true
+ }
+ )
+ }
+ }
+
private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]