This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6cec3c3647e2 [SPARK-54675][SPARK-54655][SS] Add configurable force
shutdown timeout for StateStore maintenance thread pool
6cec3c3647e2 is described below
commit 6cec3c3647e2e11ad948621c8163270199309fb3
Author: ericm-db <[email protected]>
AuthorDate: Thu Dec 11 09:01:18 2025 +0900
[SPARK-54675][SPARK-54655][SS] Add configurable force shutdown timeout for
StateStore maintenance thread pool
### What changes were proposed in this pull request?
This PR introduces a new configuration parameter
spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout to control the
timeout for force shutdown operations in the StateStore maintenance thread pool.
Previously, the force shutdown timeout was hardcoded to 60 seconds in the
MaintenanceThreadPool.stop() method. This PR makes it configurable to allow
better control over graceful vs forceful shutdown behavior in different
deployment scenarios.
### Why are the changes needed?
Different streaming workloads and operational environments may require
different force shutdown timeout values:
- Some environments may need longer timeouts to allow in-flight
maintenance operations to complete properly
- Other environments may prefer shorter timeouts to speed up shutdown
sequences
- Making this configurable provides operators with more control over the
tradeoff between clean shutdown and shutdown speed
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Existing tests continue to pass
- The default value maintains backward compatibility with the previous
hardcoded behavior
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53432 from ericm-db/state-store-shutdown-config.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++++++
.../spark/sql/execution/streaming/state/StateStore.scala | 8 +++++---
.../spark/sql/execution/streaming/state/StateStoreConf.scala | 6 ++++++
.../streaming/state/StateStoreCoordinatorSuite.scala | 10 ++++++++++
.../streaming/state/StateStoreInstanceMetricSuite.scala | 5 +++++
5 files changed, 38 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c331f1724854..dfcdbd6eca29 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2553,6 +2553,15 @@ object SQLConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300L)
+ val STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT =
+ buildConf("spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout")
+ .internal()
+ .doc("Timeout in seconds to wait for tasks to respond to cancellation
after " +
+ "force shutdown is initiated. This applies after the graceful shutdown
timeout " +
+ "has been exceeded.")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefault(60L)
+
val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT =
buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout")
.internal()
@@ -6921,6 +6930,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def stateStoreMaintenanceShutdownTimeout: Long =
getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)
+ def stateStoreMaintenanceForceShutdownTimeout: Long =
+ getConf(STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT)
+
def stateStoreMaintenanceProcessingTimeout: Long =
getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index bd6b4bede84b..53053fc9e091 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -1130,7 +1130,8 @@ object StateStore extends Logging {
*/
class MaintenanceThreadPool(
numThreads: Int,
- shutdownTimeout: Long) {
+ shutdownTimeout: Long,
+ forceShutdownTimeout: Long) {
private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
numThreads, "state-store-maintenance-thread")
@@ -1151,7 +1152,7 @@ object StateStore extends Logging {
threadPool.shutdownNow() // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ if (!threadPool.awaitTermination(forceShutdownTimeout,
TimeUnit.SECONDS)) {
logError("MaintenanceThreadPool did not terminate")
}
}
@@ -1416,6 +1417,7 @@ object StateStore extends Logging {
private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
val maintenanceShutdownTimeout =
storeConf.stateStoreMaintenanceShutdownTimeout
+ val maintenanceForceShutdownTimeout =
storeConf.stateStoreMaintenanceForceShutdownTimeout
loadedProviders.synchronized {
if (SparkEnv.get != null && !isMaintenanceRunning &&
!storeConf.unloadOnCommit) {
maintenanceTask = new MaintenanceTask(
@@ -1423,7 +1425,7 @@ object StateStore extends Logging {
task = { doMaintenance(storeConf) }
)
maintenanceThreadPool = new
MaintenanceThreadPool(numMaintenanceThreads,
- maintenanceShutdownTimeout)
+ maintenanceShutdownTimeout, maintenanceForceShutdownTimeout)
logInfo("State Store maintenance task started")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 3991f8d93f2c..f3bbc0ea2406 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -40,6 +40,12 @@ class StateStoreConf(
*/
val stateStoreMaintenanceShutdownTimeout: Long =
sqlConf.stateStoreMaintenanceShutdownTimeout
+ /**
+ * Timeout to wait for tasks to respond to cancellation after force shutdown
is initiated
+ */
+ val stateStoreMaintenanceForceShutdownTimeout: Long =
+ sqlConf.stateStoreMaintenanceForceShutdownTimeout
+
val stateStoreMaintenanceProcessingTimeout: Long =
sqlConf.stateStoreMaintenanceProcessingTimeout
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 6948aedd5640..7446390e8d06 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -127,6 +127,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"2",
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
@@ -433,6 +434,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
@@ -468,6 +470,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
@@ -511,6 +514,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -578,6 +582,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "false",
@@ -621,6 +626,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -666,6 +672,7 @@ class StateStoreCoordinatorStreamingSuite extends
StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
@@ -787,6 +794,7 @@ class StateStoreCoordinatorStreamingSuite extends
StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -866,6 +874,7 @@ class StateStoreCoordinatorStreamingSuite extends
StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
@@ -941,6 +950,7 @@ object StateStoreCoordinatorSuite {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
index 0b51a07837c2..58d951500c8c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
@@ -75,6 +75,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with
AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
) {
@@ -139,6 +140,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with
AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
) {
@@ -214,6 +216,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with
AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10",
SQLConf.SHUFFLE_PARTITIONS.key -> "3"
@@ -291,6 +294,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with
AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
) {
@@ -365,6 +369,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with
AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4",
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]