This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cec3c3647e2 [SPARK-54675][SPARK-54655][SS] Add configurable force 
shutdown timeout for StateStore maintenance thread pool
6cec3c3647e2 is described below

commit 6cec3c3647e2e11ad948621c8163270199309fb3
Author: ericm-db <[email protected]>
AuthorDate: Thu Dec 11 09:01:18 2025 +0900

    [SPARK-54675][SPARK-54655][SS] Add configurable force shutdown timeout for 
StateStore maintenance thread pool
    
    ### What changes were proposed in this pull request?
    
     This PR introduces a new configuration parameter 
spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout to control the 
timeout for force shutdown operations in the StateStore maintenance thread pool.
    
      Previously, the force shutdown timeout was hardcoded to 60 seconds in the 
MaintenanceThreadPool.stop() method. This PR makes it configurable to allow 
better control over graceful vs forceful shutdown behavior in different 
deployment scenarios.
    
    ### Why are the changes needed?
    
    Different streaming workloads and operational environments may require 
different force shutdown timeout values:
      - Some environments may need longer timeouts to allow in-flight 
maintenance operations to complete properly
      - Other environments may prefer shorter timeouts to speed up shutdown 
sequences
      - Making this configurable provides operators with more control over the 
tradeoff between clean shutdown and shutdown speed
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
      - Existing tests continue to pass
      - The default value maintains backward compatibility with the previous 
hardcoded behavior
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53432 from ericm-db/state-store-shutdown-config.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala   | 12 ++++++++++++
 .../spark/sql/execution/streaming/state/StateStore.scala     |  8 +++++---
 .../spark/sql/execution/streaming/state/StateStoreConf.scala |  6 ++++++
 .../streaming/state/StateStoreCoordinatorSuite.scala         | 10 ++++++++++
 .../streaming/state/StateStoreInstanceMetricSuite.scala      |  5 +++++
 5 files changed, 38 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c331f1724854..dfcdbd6eca29 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2553,6 +2553,15 @@ object SQLConf {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefault(300L)
 
+  val STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT =
+    buildConf("spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout")
+      .internal()
+      .doc("Timeout in seconds to wait for tasks to respond to cancellation 
after " +
+        "force shutdown is initiated. This applies after the graceful shutdown 
timeout " +
+        "has been exceeded.")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefault(60L)
+
   val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT =
     buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout")
       .internal()
@@ -6921,6 +6930,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def stateStoreMaintenanceShutdownTimeout: Long = 
getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)
 
+  def stateStoreMaintenanceForceShutdownTimeout: Long =
+    getConf(STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT)
+
   def stateStoreMaintenanceProcessingTimeout: Long =
     getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index bd6b4bede84b..53053fc9e091 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -1130,7 +1130,8 @@ object StateStore extends Logging {
    */
   class MaintenanceThreadPool(
       numThreads: Int,
-      shutdownTimeout: Long) {
+      shutdownTimeout: Long,
+      forceShutdownTimeout: Long) {
     private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
       numThreads, "state-store-maintenance-thread")
 
@@ -1151,7 +1152,7 @@ object StateStore extends Logging {
         threadPool.shutdownNow() // Cancel currently executing tasks
 
         // Wait a while for tasks to respond to being cancelled
-        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        if (!threadPool.awaitTermination(forceShutdownTimeout, 
TimeUnit.SECONDS)) {
           logError("MaintenanceThreadPool did not terminate")
         }
       }
@@ -1416,6 +1417,7 @@ object StateStore extends Logging {
   private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
     val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
     val maintenanceShutdownTimeout = 
storeConf.stateStoreMaintenanceShutdownTimeout
+    val maintenanceForceShutdownTimeout = 
storeConf.stateStoreMaintenanceForceShutdownTimeout
     loadedProviders.synchronized {
       if (SparkEnv.get != null && !isMaintenanceRunning && 
!storeConf.unloadOnCommit) {
         maintenanceTask = new MaintenanceTask(
@@ -1423,7 +1425,7 @@ object StateStore extends Logging {
           task = { doMaintenance(storeConf) }
         )
         maintenanceThreadPool = new 
MaintenanceThreadPool(numMaintenanceThreads,
-          maintenanceShutdownTimeout)
+          maintenanceShutdownTimeout, maintenanceForceShutdownTimeout)
         logInfo("State Store maintenance task started")
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 3991f8d93f2c..f3bbc0ea2406 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -40,6 +40,12 @@ class StateStoreConf(
    */
   val stateStoreMaintenanceShutdownTimeout: Long = 
sqlConf.stateStoreMaintenanceShutdownTimeout
 
+  /**
+   * Timeout to wait for tasks to respond to cancellation after force shutdown 
is initiated
+   */
+  val stateStoreMaintenanceForceShutdownTimeout: Long =
+    sqlConf.stateStoreMaintenanceForceShutdownTimeout
+
   val stateStoreMaintenanceProcessingTimeout: Long = 
sqlConf.stateStoreMaintenanceProcessingTimeout
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 6948aedd5640..7446390e8d06 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -127,6 +127,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
         SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
         SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
         SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
         SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
         
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> 
"2",
         SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
@@ -433,6 +434,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
         SQLConf.SHUFFLE_PARTITIONS.key -> "5",
         SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
         SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
         SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
         SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
         RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
@@ -468,6 +470,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
         SQLConf.SHUFFLE_PARTITIONS.key -> "3",
         SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
         SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
         SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
         SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
         RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
@@ -511,6 +514,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       SQLConf.SHUFFLE_PARTITIONS.key -> "5",
       SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
       SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
       SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
         classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -578,6 +582,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       SQLConf.SHUFFLE_PARTITIONS.key -> "5",
       SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
       SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
       SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
       RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "false",
@@ -621,6 +626,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       SQLConf.SHUFFLE_PARTITIONS.key -> "2",
       SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
       SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
       SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
         classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -666,6 +672,7 @@ class StateStoreCoordinatorStreamingSuite extends 
StreamTest {
         SQLConf.SHUFFLE_PARTITIONS.key -> "3",
         SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
         SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
         SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
         SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
         RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
@@ -787,6 +794,7 @@ class StateStoreCoordinatorStreamingSuite extends 
StreamTest {
       SQLConf.SHUFFLE_PARTITIONS.key -> "3",
       SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
       SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
       SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
         classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -866,6 +874,7 @@ class StateStoreCoordinatorStreamingSuite extends 
StreamTest {
         SQLConf.SHUFFLE_PARTITIONS.key -> "2",
         SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
         SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
         SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
         SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
         RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
@@ -941,6 +950,7 @@ object StateStoreCoordinatorSuite {
     SQLConf.SHUFFLE_PARTITIONS.key -> "5",
     SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
     SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+    SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
     SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
     RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
     SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
index 0b51a07837c2..58d951500c8c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
@@ -75,6 +75,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
           SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
           SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
           SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+          SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
           SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
           SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
         ) {
@@ -139,6 +140,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
           SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
           SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
           SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+          SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
           SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
           SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
         ) {
@@ -214,6 +216,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
           SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
           SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
           SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+          SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
           SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
           SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10",
           SQLConf.SHUFFLE_PARTITIONS.key -> "3"
@@ -291,6 +294,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
           SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
           SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
           SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+          SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
           SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
           SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
         ) {
@@ -365,6 +369,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
       SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
       SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
       SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
       SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4",
       SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to