This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 76e5d75 [SPARK-33763] Add metrics for better tracking of dynamic allocation 76e5d75 is described below commit 76e5d75e369609b96248a34bc6015ec61936e652 Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Wed Feb 17 13:44:36 2021 -0800 [SPARK-33763] Add metrics for better tracking of dynamic allocation ### What changes were proposed in this pull request? This PR adds the following metrics to track executor remove reasons during dynamic allocation: - `numberExecutorsGracefullyDecommissioned`: number of executors which reached the finished decommissioning state and shut itself down cleanly - `numberExecutorsDecommissionUnfinished`: executors which requested to decommission but they stopped without reaching the finished decommissioning state - `numberExecutorsKilledByDriver`: executors killed by the driver (requested to stop) - `numberExecutorsExitedUnexpectedly`: executors exited without driver request ### Why are the changes needed? For supporting monitoring of dynamic allocation better with these metrics. ### Does this PR introduce _any_ user-facing change? Yes. The new metrics will be available for monitoring. ### How was this patch tested? With unit and integration tests. Finally manually checked the new metrics in jconsole: <img width="1054" alt="jmx" src="https://user-images.githubusercontent.com/2017933/107458686-de8adf00-6b54-11eb-86f7-41faf2fb638f.png"> Closes #31450 from attilapiros/SPARK-33763-final. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../apache/spark/ExecutorAllocationManager.scala | 80 +++++++++++++--------- .../executor/CoarseGrainedExecutorBackend.scala | 6 +- .../spark/scheduler/ExecutorLossReason.scala | 4 ++ .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 20 +++++- .../spark/ExecutorAllocationManagerSuite.scala | 54 ++++++++++++++- .../scheduler/dynalloc/ExecutorMonitorSuite.scala | 22 ++++-- .../k8s/integrationtest/DecommissionSuite.scala | 11 ++- 7 files changed, 150 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bdb768e..822a0a5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.{ControlThrowable, NonFatal} -import com.codahale.metrics.{Gauge, MetricRegistry} +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ @@ -135,14 +135,14 @@ private[spark] class ExecutorAllocationManager( validateSettings() // Number of executors to add for each ResourceProfile in the next round - private val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int] + private[spark] val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int] numExecutorsToAddPerResourceProfileId(defaultProfileId) = 1 // The desired number of executors at this moment in time. If all our executors were to die, this // is the number of executors we would immediately want from the cluster manager. // Note every profile will be allowed to have initial number, // we may want to make this configurable per Profile in the future - private val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int] + private[spark] val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int] numExecutorsTargetPerResourceProfileId(defaultProfileId) = initialNumExecutors // A timestamp of when an addition should be triggered, or NOT_SET if it is not set @@ -155,14 +155,15 @@ private[spark] class ExecutorAllocationManager( // Listener for Spark events that impact the allocation policy val listener = new ExecutorAllocationListener - val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock) - // Executor that handles the scheduling task. private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. - val executorAllocationManagerSource = new ExecutorAllocationManagerSource + val executorAllocationManagerSource = new ExecutorAllocationManagerSource(this) + + val executorMonitor = + new ExecutorMonitor(conf, client, listenerBus, clock, executorAllocationManagerSource) // Whether we are still waiting for the initial set of executors to be allocated. // While this is true, we will not cancel outstanding executor requests. This is @@ -288,7 +289,7 @@ private[spark] class ExecutorAllocationManager( * The maximum number of executors, for the ResourceProfile id passed in, that we would need * under the current load to satisfy all running and pending tasks, rounded up. */ - private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { + private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { val pending = listener.totalPendingTasksPerResourceProfile(rpId) val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId) val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId) @@ -967,34 +968,47 @@ private[spark] class ExecutorAllocationManager( rplocalityToCount.map { case (k, v) => (k, v.toMap)}.toMap } } +} - /** - * Metric source for ExecutorAllocationManager to expose its internal executor allocation - * status to MetricsSystem. - * Note: These metrics heavily rely on the internal implementation of - * ExecutorAllocationManager, metrics or value of metrics will be changed when internal - * implementation is changed, so these metrics are not stable across Spark version. - */ - private[spark] class ExecutorAllocationManagerSource extends Source { - val sourceName = "ExecutorAllocationManager" - val metricRegistry = new MetricRegistry() - - private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { - metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { - override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } - }) - } - - // The metrics are going to return the sum for all the different ResourceProfiles. - registerGauge("numberExecutorsToAdd", - numExecutorsToAddPerResourceProfileId.values.sum, 0) - registerGauge("numberExecutorsPendingToRemove", executorMonitor.pendingRemovalCount, 0) - registerGauge("numberAllExecutors", executorMonitor.executorCount, 0) - registerGauge("numberTargetExecutors", - numExecutorsTargetPerResourceProfileId.values.sum, 0) - registerGauge("numberMaxNeededExecutors", numExecutorsTargetPerResourceProfileId.keys - .map(maxNumExecutorsNeededPerResourceProfile(_)).sum, 0) +/** + * Metric source for ExecutorAllocationManager to expose its internal executor allocation + * status to MetricsSystem. + * Note: These metrics heavily rely on the internal implementation of + * ExecutorAllocationManager, metrics or value of metrics will be changed when internal + * implementation is changed, so these metrics are not stable across Spark version. + */ +private[spark] class ExecutorAllocationManagerSource( + executorAllocationManager: ExecutorAllocationManager) extends Source { + val sourceName = "ExecutorAllocationManager" + val metricRegistry = new MetricRegistry() + + private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { + metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { + override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } + }) } + + private def getCounter(name: String): Counter = { + metricRegistry.counter(MetricRegistry.name("executors", name)) + } + + val gracefullyDecommissioned: Counter = getCounter("numberExecutorsGracefullyDecommissioned") + val decommissionUnfinished: Counter = getCounter("numberExecutorsDecommissionUnfinished") + val driverKilled: Counter = getCounter("numberExecutorsKilledByDriver") + val exitedUnexpectedly: Counter = getCounter("numberExecutorsExitedUnexpectedly") + + // The metrics are going to return the sum for all the different ResourceProfiles. + registerGauge("numberExecutorsToAdd", + executorAllocationManager.numExecutorsToAddPerResourceProfileId.values.sum, 0) + registerGauge("numberExecutorsPendingToRemove", + executorAllocationManager.executorMonitor.pendingRemovalCount, 0) + registerGauge("numberAllExecutors", + executorAllocationManager.executorMonitor.executorCount, 0) + registerGauge("numberTargetExecutors", + executorAllocationManager.numExecutorsTargetPerResourceProfileId.values.sum, 0) + registerGauge("numberMaxNeededExecutors", + executorAllocationManager.numExecutorsTargetPerResourceProfileId.keys + .map(executorAllocationManager.maxNumExecutorsNeededPerResourceProfile(_)).sum, 0) } private object ExecutorAllocationManager { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e1d3009..43c122a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -39,7 +39,7 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} +import org.apache.spark.scheduler.{ExecutorLossMessage, ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} @@ -322,13 +322,13 @@ private[spark] class CoarseGrainedExecutorBackend( // since the start of computing it. if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { logInfo("No running tasks, all blocks migrated, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } else { logInfo("All blocks not yet migrated.") } } else { logInfo("No running tasks, no block migration configured, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } } else { logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 2644d0a..f333c01 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -40,6 +40,10 @@ private[spark] object ExecutorExited { } } +private[spark] object ExecutorLossMessage { + val decommissionFinished = "Finished decommissioning" +} + private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.") /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 8c84b318..cecd4b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -38,7 +38,9 @@ private[spark] class ExecutorMonitor( conf: SparkConf, client: ExecutorAllocationClient, listenerBus: LiveListenerBus, - clock: Clock) extends SparkListener with CleanerListener with Logging { + clock: Clock, + metrics: ExecutorAllocationManagerSource = null) + extends SparkListener with CleanerListener with Logging { private val idleTimeoutNs = TimeUnit.SECONDS.toNanos( conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)) @@ -352,6 +354,22 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) + if (removed.decommissioning) { + if (event.reason == ExecutorLossMessage.decommissionFinished) { + metrics.gracefullyDecommissioned.inc() + } else { + metrics.decommissionUnfinished.inc() + } + } else if (removed.pendingRemoval) { + metrics.driverKilled.inc() + } else { + metrics.exitedUnexpectedly.inc() + } + logInfo(s"Executor ${event.executorId} is removed. Remove reason statistics: (" + + s"gracefully decommissioned: ${metrics.gracefullyDecommissioned.getCount()}, " + + s"decommision unfinished: ${metrics.decommissionUnfinished.getCount()}, " + + s"driver killed: ${metrics.driverKilled.getCount()}, " + + s"unexpectedly exited: ${metrics.exitedUnexpectedly.getCount()}).") if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5ae596b..2fb5140 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -936,6 +936,53 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).isEmpty) } + test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled=false)") { + val manager = createManager(createConf(3, 5, 3)) + (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) } + + assert(executorsPendingToRemove(manager).isEmpty) + assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2")) + assert(executorsPendingToRemove(manager).contains("1")) + assert(executorsPendingToRemove(manager).contains("2")) + + onExecutorRemoved(manager, "1", "driver requested exit") + assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 1) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0) + + onExecutorRemoved(manager, "2", "another driver requested exit") + assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0) + + onExecutorRemoved(manager, "3", "this will be an unexpected exit") + assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1) + } + + test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled = true)") { + val manager = createManager(createConf(3, 5, 3, decommissioningEnabled = true)) + (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) } + + assert(executorsPendingToRemove(manager).isEmpty) + assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2")) + assert(executorsDecommissioning(manager).contains("1")) + assert(executorsDecommissioning(manager).contains("2")) + + onExecutorRemoved(manager, "1", ExecutorLossMessage.decommissionFinished) + assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1) + assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 0) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0) + + onExecutorRemoved(manager, "2", "stopped before gracefully finished") + assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1) + assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0) + + onExecutorRemoved(manager, "3", "this will be an unexpected exit") + assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1) + assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1) + assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1) + } + test("remove multiple executors") { val manager = createManager(createConf(5, 10, 5)) (1 to 10).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) } @@ -1701,8 +1748,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerExecutorAdded(0L, id, execInfo)) } - private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = { - post(SparkListenerExecutorRemoved(0L, id, null)) + private def onExecutorRemoved( + manager: ExecutorAllocationManager, + id: String, + reason: String = null): Unit = { + post(SparkListenerExecutorRemoved(0L, id, reason)) } private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 6d49479..69afdb5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable +import com.codahale.metrics.Counter import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, mock, when} @@ -57,6 +58,15 @@ class ExecutorMonitorSuite extends SparkFunSuite { // having to use mockito APIs directly in each test. private val knownExecs = mutable.HashSet[String]() + private def allocationManagerSource(): ExecutorAllocationManagerSource = { + val metricSource = mock(classOf[ExecutorAllocationManagerSource]) + when(metricSource.driverKilled).thenReturn(new Counter) + when(metricSource.decommissionUnfinished).thenReturn(new Counter) + when(metricSource.gracefullyDecommissioned).thenReturn(new Counter) + when(metricSource.exitedUnexpectedly).thenReturn(new Counter) + metricSource + } + override def beforeEach(): Unit = { super.beforeEach() knownExecs.clear() @@ -65,7 +75,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { when(client.isExecutorActive(any())).thenAnswer { invocation => knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String]) } - monitor = new ExecutorMonitor(conf, client, null, clock) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) } test("basic executor timeout") { @@ -231,7 +241,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(storageDeadline) === Seq("1")) conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) - monitor = new ExecutorMonitor(conf, client, null, clock) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY)) @@ -292,7 +302,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("shuffle block tracking") { val bus = mockListenerBus() conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false) - monitor = new ExecutorMonitor(conf, client, bus, clock) + monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource()) // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle. val stage1 = stageInfo(1, shuffleId = 0) @@ -360,7 +370,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is on") { val bus = mockListenerBus() conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, true) - monitor = new ExecutorMonitor(conf, client, bus, clock) { + monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource()) { override def onOtherEvent(event: SparkListenerEvent): Unit = { throw new IllegalStateException("No event should be sent.") } @@ -372,7 +382,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("shuffle tracking with multiple executors and concurrent jobs") { val bus = mockListenerBus() conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false) - monitor = new ExecutorMonitor(conf, client, bus, clock) + monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource()) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) @@ -417,7 +427,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue) .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) .set(SHUFFLE_SERVICE_ENABLED, false) - monitor = new ExecutorMonitor(conf, client, null, clock) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) // Generate events that will make executor 1 be idle, while still holding shuffle data. // The executor should not be eligible for removal since the timeout is basically "infinite". diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index 9291861..75c27f6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -16,6 +16,10 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.time.Minutes +import org.scalatest.time.Span + import org.apache.spark.internal.config private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => @@ -104,13 +108,15 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => mainClass = "", expectedDriverLogOnCompletion = Seq( "Finished waiting, stopping Spark", - "Decommission executors"), + "Decommission executors", + "Remove reason statistics: (gracefully decommissioned: 1, decommision unfinished: 0, " + + "driver killed: 0, unexpectedly exited: 0)."), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, executorPodChecker = doBasicExecutorPyPodCheck, isJVM = false, pyFiles = None, - executorPatience = None, + executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)), decommissioningTest = false) } @@ -151,4 +157,5 @@ private[spark] object DecommissionSuite { val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py" val PYSPARK_DECOMISSIONING_CLEANUP: String = TEST_LOCAL_PYSPARK + "decommissioning_cleanup.py" val PYSPARK_SCALE: String = TEST_LOCAL_PYSPARK + "autoscale.py" + val DECOMMISSIONING_FINISHED_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org