This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 78df2ca [SPARK-31619][CORE] Rename config
"spark.dynamicAllocation.shuffleTimeout" to
"spark.dynamicAllocation.shuffleTracking.timeout"
78df2ca is described below
commit 78df2caec8c94c31e5c9ddc30ed8acb424084181
Author: Xingbo Jiang <[email protected]>
AuthorDate: Fri May 1 11:46:17 2020 +0900
[SPARK-31619][CORE] Rename config "spark.dynamicAllocation.shuffleTimeout"
to "spark.dynamicAllocation.shuffleTracking.timeout"
### What changes were proposed in this pull request?
The "spark.dynamicAllocation.shuffleTimeout" configuration only takes
effect if "spark.dynamicAllocation.shuffleTracking.enabled" is true, so we
should re-namespace that configuration so that it's nested under the
"shuffleTracking" one.
### How was this patch tested?
Covered by current existing test cases.
Closes #28426 from jiangxb1987/confName.
Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit b7cde42b04b21c9bfee6535199cf385855c15853)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +-
.../scala/org/apache/spark/internal/config/package.scala | 6 +++---
.../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 4 ++--
.../spark/scheduler/dynalloc/ExecutorMonitorSuite.scala | 12 ++++++------
docs/configuration.md | 2 +-
5 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 677386c..74103dc 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -196,7 +196,7 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be >
0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
+ if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the
external " +
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f70ee2e..bfdd0602 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -520,14 +520,14 @@ package object config {
.checkValue(_ >= 0L, "Timeout must be >= 0.")
.createWithDefault(60)
- private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING =
+ private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED =
ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
- private[spark] val DYN_ALLOCATION_SHUFFLE_TIMEOUT =
- ConfigBuilder("spark.dynamicAllocation.shuffleTimeout")
+ private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT =
+ ConfigBuilder("spark.dynamicAllocation.shuffleTracking.timeout")
.version("3.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ >= 0L, "Timeout must be >= 0.")
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index a24f190..3664d3f 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -45,12 +45,12 @@ private[spark] class ExecutorMonitor(
private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
private val shuffleTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
- conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT))
+ conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT))
private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) &&
- conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING)
+ conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)
private val executors = new ConcurrentHashMap[String, Tracker]()
private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]()
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 615389a..c5f7315 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -42,7 +42,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
private val conf = new SparkConf()
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
.set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s")
- .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT.key, "240s")
+ .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT.key, "240s")
.set(SHUFFLE_SERVICE_ENABLED, true)
private var monitor: ExecutorMonitor = _
@@ -287,7 +287,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("shuffle block tracking") {
val bus = mockListenerBus()
- conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING,
true).set(SHUFFLE_SERVICE_ENABLED, false)
+ conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED,
true).set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, bus, clock)
// 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
@@ -355,7 +355,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is
on") {
val bus = mockListenerBus()
- conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING,
true).set(SHUFFLE_SERVICE_ENABLED, true)
+ conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED,
true).set(SHUFFLE_SERVICE_ENABLED, true)
monitor = new ExecutorMonitor(conf, client, bus, clock) {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
throw new IllegalStateException("No event should be sent.")
@@ -367,7 +367,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("shuffle tracking with multiple executors and concurrent jobs") {
val bus = mockListenerBus()
- conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING,
true).set(SHUFFLE_SERVICE_ENABLED, false)
+ conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED,
true).set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, bus, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"1", execInfo))
@@ -410,8 +410,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("SPARK-28455: avoid overflow in timeout calculation") {
conf
- .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT, Long.MaxValue)
- .set(DYN_ALLOCATION_SHUFFLE_TRACKING, true)
+ .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue)
+ .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)
diff --git a/docs/configuration.md b/docs/configuration.md
index a97d3e4..2701fdb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2551,7 +2551,7 @@ Apart from these, the following properties are also
available, and may be useful
<td>3.0.0</td>
</tr>
<tr>
- <td><code>spark.dynamicAllocation.shuffleTimeout</code></td>
+ <td><code>spark.dynamicAllocation.shuffleTracking.timeout</code></td>
<td><code>infinity</code></td>
<td>
When shuffle tracking is enabled, controls the timeout for executors that
are holding shuffle
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]