This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8e88f5ab8501 [SPARK-55999][SS] Enable forceSnapshotUploadOnLag by
default
8e88f5ab8501 is described below
commit 8e88f5ab850191edb8c02e054fcb15c22ccdd9ca
Author: zifeif2 <[email protected]>
AuthorDate: Fri Mar 20 13:50:15 2026 -0700
[SPARK-55999][SS] Enable forceSnapshotUploadOnLag by default
### What changes were proposed in this pull request?
1. Config default change (SQLConf.scala): forceSnapshotUploadOnLag default
false → true
2. 3 SPARK-51358 tests fixed (StateStoreCoordinatorSuite.scala): Explicitly
set forceSnapshotUploadOnLag=false so lag detection tests aren't interfered
with by the forced remediation
3. Cleanup (StateStoreCoordinatorSuite.scala): Removed redundant
forceSnapshotUploadOnLag -> "true"
### Why are the changes needed?
When state store is lagging in uploading snapshot in maintenance thread,
turning on this feature allows state store to upload snapshot in query
execution thread, which improves query reliability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The feature is tested in https://github.com/apache/spark/pull/52773
### Was this patch authored or co-authored using generative AI tooling?
yes
Closes #54847 from zifeif2/enable-force-snapshot.
Authored-by: zifeif2 <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
.../execution/streaming/state/StateStoreCoordinatorSuite.scala | 10 ++++++----
.../scala/org/apache/spark/sql/internal/SQLConfSuite.scala | 3 +++
3 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2c872d3a7567..22f5b3f6c792 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2863,7 +2863,7 @@ object SQLConf {
)
.version("4.2.0")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
buildConf("spark.sql.streaming.internal.stateStore.partitions")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 03257d3da373..8e8cff582c6f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -440,7 +440,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"2",
- SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
+ SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key ->
"0",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
) {
case (coordRef, spark) =>
import spark.implicits._
@@ -477,7 +478,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"5",
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key ->
"0",
- SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT.key -> "5"
+ SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT.key ->
"5",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
) {
case (coordRef, spark) =>
import spark.implicits._
@@ -521,7 +523,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"2",
- SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
+ SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
) {
case (coordRef, spark) =>
import spark.implicits._
@@ -946,7 +949,6 @@ class StateStoreCoordinatorStreamingSuite extends
StreamTest {
object StateStoreCoordinatorSuite {
// Common configuration for SPARK-54063 tests
private val spark54063CommonConfigs: Seq[(String, String)] = Seq(
- SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 5b80ed8fc4e3..2bb3b4247d01 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -562,6 +562,9 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
test("[SPARK-54063] STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG requires " +
"STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG") {
+ // Default values should work fine - both default to true
+ assert(spark.sessionState.conf.stateStoreForceSnapshotUploadOnLag === true)
+
// This should work fine - both enabled
withSQLConf(
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]