This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9e42805f77c [SPARK-44480][SS] Use thread pool to perform maintenance activity for hdfs/rocksdb state store providers 9e42805f77c is described below commit 9e42805f77c1ab5beb3c193f47f016e735a68f06 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Wed Aug 2 04:52:43 2023 +0900 [SPARK-44480][SS] Use thread pool to perform maintenance activity for hdfs/rocksdb state store providers ### What changes were proposed in this pull request? Maintenance tasks on StateStore was being done by a single background thread, which is prone to straggling. In this change, the single background thread would instead schedule maintenance tasks to a thread pool. By default, the maximum number of threads in the new thread pool is determined via the number of cores * 0.25, so that this thread pool doesn't take too many resources away from the query and affect performance. Users can set the number of threads explicitly via `spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads`. ### Why are the changes needed? Using a thread pool instead of a single thread for snapshotting and cleanup reduces the effect of stragglers in the background task. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual testing, wrote a stateful query, tracked how many maintenance tasks were done and compared this to the baseline. `StateStoreSuite` is enough to test functional correctness and cleanup here Closes #42066 from ericm-db/maintenance-thread-pool-optional. Lead-authored-by: Eric Marnadi <eric.marn...@databricks.com> Co-authored-by: ericm-db <132308037+ericm...@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++ .../sql/execution/streaming/state/StateStore.scala | 108 ++++++++++++++++++--- .../execution/streaming/state/StateStoreConf.scala | 5 + 3 files changed, 114 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6eb2d9c38d9..dfa2a0f251f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1856,6 +1856,17 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider") + val NUM_STATE_STORE_MAINTENANCE_THREADS = + buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads") + .internal() + .doc("Number of threads in the thread pool that perform clean up and snapshotting tasks " + + "for stateful streaming queries. The default value is the number of cores * 0.25 " + + "so that this thread pool doesn't take too many resources " + + "away from the query and affect performance.") + .intConf + .checkValue(_ > 0, "Must be greater than 0") + .createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 4, 1)) + val STATE_SCHEMA_CHECK_ENABLED = buildConf("spark.sql.streaming.stateStore.stateSchemaCheck") .doc("When true, Spark will validate the state schema against schema on existing state and " + @@ -4523,6 +4534,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED) + def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS) + def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index cabad54be64..a1d4f7f40a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state import java.util.UUID import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -440,6 +441,18 @@ object StateStore extends Logging { @GuardedBy("loadedProviders") private val schemaValidated = new mutable.HashMap[StateStoreProviderId, Option[Throwable]]() + private val maintenanceThreadPoolLock = new Object + + // Shared exception between threads in thread pool that the scheduling thread + // checks to see if an exception has been thrown in the maintenance task + private val threadPoolException = new AtomicReference[Throwable](null) + + // This set is to keep track of the partitions that are queued + // for maintenance or currently have maintenance running on them + // to prevent the same partition from being processed concurrently. + @GuardedBy("maintenanceThreadPoolLock") + private val maintenancePartitions = new mutable.HashSet[StateStoreProviderId] + /** * Runs the `task` periodically and automatically cancels it if there is an exception. `onError` * will be called when an exception happens. @@ -472,9 +485,29 @@ object StateStore extends Logging { def isRunning: Boolean = !future.isDone } + /** + * Thread Pool that runs maintenance on partitions that are scheduled by + * MaintenanceTask periodically + */ + class MaintenanceThreadPool(numThreads: Int) { + private val threadPool = ThreadUtils.newDaemonFixedThreadPool( + numThreads, "state-store-maintenance-thread") + + def execute(runnable: Runnable): Unit = { + threadPool.execute(runnable) + } + + def stop(): Unit = { + threadPool.shutdown() + } + } + @GuardedBy("loadedProviders") private var maintenanceTask: MaintenanceTask = null + @GuardedBy("loadedProviders") + private var maintenanceThreadPool: MaintenanceThreadPool = null + @GuardedBy("loadedProviders") private var _coordRef: StateStoreCoordinatorRef = null @@ -591,6 +624,14 @@ object StateStore extends Logging { /** Stop maintenance thread and reset the maintenance task */ def stopMaintenanceTask(): Unit = loadedProviders.synchronized { + if (maintenanceThreadPool != null) { + threadPoolException.set(null) + maintenanceThreadPoolLock.synchronized { + maintenancePartitions.clear() + } + maintenanceThreadPool.stop() + maintenanceThreadPool = null + } if (maintenanceTask != null) { maintenanceTask.stop() maintenanceTask = null @@ -607,7 +648,8 @@ object StateStore extends Logging { } /** Start the periodic maintenance task if not already started and if Spark active */ - private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = + private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = { + val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads loadedProviders.synchronized { if (SparkEnv.get != null && !isMaintenanceRunning) { maintenanceTask = new MaintenanceTask( @@ -623,9 +665,22 @@ object StateStore extends Logging { } } ) + maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads) logInfo("State Store maintenance task started") } } + } + + private def processThisPartition(id: StateStoreProviderId): Boolean = { + maintenanceThreadPoolLock.synchronized { + if (!maintenancePartitions.contains(id)) { + maintenancePartitions.add(id) + true + } else { + false + } + } + } /** * Execute background maintenance task in all the loaded store providers if they are still @@ -636,17 +691,46 @@ object StateStore extends Logging { if (SparkEnv.get == null) { throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores") } - loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => - try { - provider.doMaintenance() - if (!verifyIfStoreInstanceActive(id)) { - unload(id) - logInfo(s"Unloaded $provider") - } - } catch { - case NonFatal(e) => - logWarning(s"Error managing $provider, stopping management thread") - throw e + loadedProviders.synchronized { + loadedProviders.toSeq + }.foreach { case (id, provider) => + // check exception + if (threadPoolException.get() != null) { + val exception = threadPoolException.get() + logWarning("Error in maintenanceThreadPool", exception) + throw exception + } + if (processThisPartition(id)) { + maintenanceThreadPool.execute(() => { + val startTime = System.currentTimeMillis() + try { + provider.doMaintenance() + if (!verifyIfStoreInstanceActive(id)) { + unload(id) + logInfo(s"Unloaded $provider") + } + } catch { + case NonFatal(e) => + logWarning(s"Error managing $provider, stopping management thread", e) + threadPoolException.set(e) + throw e + } finally { + val duration = System.currentTimeMillis() - startTime + val logMsg = s"Finished maintenance task for provider=$id" + + s" in elapsed_time=$duration\n" + if (duration > 5000) { + logInfo(logMsg) + } else { + logDebug(logMsg) + } + maintenanceThreadPoolLock.synchronized { + maintenancePartitions.remove(id) + } + } + }) + } else { + logInfo(s"Not processing partition ${id} for maintenance because it is currently " + + s"being processed") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 21a18745348..c7004524097 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -27,6 +27,11 @@ class StateStoreConf( def this() = this(new SQLConf) + /** + * Size of MaintenanceThreadPool to perform maintenance tasks for StateStore + */ + val numStateStoreMaintenanceThreads: Int = sqlConf.numStateStoreMaintenanceThreads + /** * Minimum number of delta files in a chain after which HDFSBackedStateStore will * consider generating a snapshot. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org