This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 53658ab [SPARK-27419][CORE] Avoid casting heartbeat interval to
seconds (2.4)
53658ab is described below
commit 53658ab9b446256fc7de6209f38050c0e50837cc
Author: Shixiong Zhu <[email protected]>
AuthorDate: Wed Apr 10 09:41:36 2019 +0800
[SPARK-27419][CORE] Avoid casting heartbeat interval to seconds (2.4)
## What changes were proposed in this pull request?
Right now as we cast the heartbeat interval to seconds, any value less than
1 second will be casted to 0. This PR just backports the changes of the
heartbeat interval in https://github.com/apache/spark/pull/22473 from master.
## How was this patch tested?
Jenkins
Closes #24329 from zsxwing/SPARK-27419.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
core/src/main/scala/org/apache/spark/SparkConf.scala | 12 ++++++------
core/src/main/scala/org/apache/spark/executor/Executor.scala | 11 +++++++----
.../scala/org/apache/spark/internal/config/package.scala | 5 +++++
.../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 3 ++-
4 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0b24fe2..e3aba7e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
- val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout",
"120s")
- val executorHeartbeatInterval =
- getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
+ val executorTimeoutThresholdMs =
+ getTimeAsSeconds("spark.network.timeout", "120s") * 1000
+ val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
- require(executorTimeoutThreshold > executorHeartbeatInterval, "The value
of " +
- s"spark.network.timeout=${executorTimeoutThreshold}s must be no less
than the value of " +
- s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
+ require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The
value of " +
+ s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less
than the value of " +
+ s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
}
/**
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9ac8063..3348067 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -171,6 +171,11 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
+ /**
+ * Interval to send heartbeats, in milliseconds
+ */
+ private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
+
// Executor for the heartbeat task.
private val heartbeater =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
@@ -832,11 +837,9 @@ private[spark] class Executor(
}
val message = Heartbeat(executorId, accumUpdates.toArray,
env.blockManager.blockManagerId)
- val heartbeatIntervalInSec =
- conf.getTimeAsMs("spark.executor.heartbeatInterval",
"10s").millis.toSeconds.seconds
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
- message, new RpcTimeout(heartbeatIntervalInSec,
"spark.executor.heartbeatInterval"))
+ message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis,
EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
@@ -858,7 +861,7 @@ private[spark] class Executor(
* Schedules a task to report heartbeat and partial metrics for active tasks
to driver.
*/
private def startDriverHeartbeater(): Unit = {
- val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval",
"10s")
+ val intervalMs = HEARTBEAT_INTERVAL_MS
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random *
intervalMs).asInstanceOf[Int]
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5c17b9b..559b0e1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -79,6 +79,11 @@ package object config {
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
+ private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
+ ConfigBuilder("spark.executor.heartbeatInterval")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("10s")
+
private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 178de30..bac0246 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkConf, SparkContext,
SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout",
"120s")}s"),
- sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+ sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]