This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 53658ab  [SPARK-27419][CORE] Avoid casting heartbeat interval to 
seconds (2.4)
53658ab is described below

commit 53658ab9b446256fc7de6209f38050c0e50837cc
Author: Shixiong Zhu <[email protected]>
AuthorDate: Wed Apr 10 09:41:36 2019 +0800

    [SPARK-27419][CORE] Avoid casting heartbeat interval to seconds (2.4)
    
    ## What changes were proposed in this pull request?
    
    Right now as we cast the heartbeat interval to seconds, any value less than 
1 second will be casted to 0. This PR just backports the changes of the 
heartbeat interval in https://github.com/apache/spark/pull/22473 from master.
    
    ## How was this patch tested?
    
    Jenkins
    
    Closes #24329 from zsxwing/SPARK-27419.
    
    Authored-by: Shixiong Zhu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 core/src/main/scala/org/apache/spark/SparkConf.scala         | 12 ++++++------
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 11 +++++++----
 .../scala/org/apache/spark/internal/config/package.scala     |  5 +++++
 .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala   |  3 ++-
 4 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0b24fe2..e3aba7e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
 
-    val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", 
"120s")
-    val executorHeartbeatInterval =
-      getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
+    val executorTimeoutThresholdMs =
+      getTimeAsSeconds("spark.network.timeout", "120s") * 1000
+    val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
     // If spark.executor.heartbeatInterval bigger than spark.network.timeout,
     // it will almost always cause ExecutorLostFailure. See SPARK-22754.
-    require(executorTimeoutThreshold > executorHeartbeatInterval, "The value 
of " +
-      s"spark.network.timeout=${executorTimeoutThreshold}s must be no less 
than the value of " +
-      s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
+    require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The 
value of " +
+      s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less 
than the value of " +
+      s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9ac8063..3348067 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -171,6 +171,11 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
+  /**
+   * Interval to send heartbeats, in milliseconds
+   */
+  private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
+
   // Executor for the heartbeat task.
   private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
 
@@ -832,11 +837,9 @@ private[spark] class Executor(
     }
 
     val message = Heartbeat(executorId, accumUpdates.toArray, 
env.blockManager.blockManagerId)
-    val heartbeatIntervalInSec =
-      conf.getTimeAsMs("spark.executor.heartbeatInterval", 
"10s").millis.toSeconds.seconds
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
-          message, new RpcTimeout(heartbeatIntervalInSec, 
"spark.executor.heartbeatInterval"))
+          message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, 
EXECUTOR_HEARTBEAT_INTERVAL.key))
       if (response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")
         env.blockManager.reregister()
@@ -858,7 +861,7 @@ private[spark] class Executor(
    * Schedules a task to report heartbeat and partial metrics for active tasks 
to driver.
    */
   private def startDriverHeartbeater(): Unit = {
-    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", 
"10s")
+    val intervalMs = HEARTBEAT_INTERVAL_MS
 
     // Wait a random interval so the heartbeats don't end up in sync
     val initialDelay = intervalMs + (math.random * 
intervalMs).asInstanceOf[Int]
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5c17b9b..559b0e1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -79,6 +79,11 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
     
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.executor.heartbeatInterval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10s")
+
   private[spark] val EXECUTOR_JAVA_OPTIONS =
     
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
 
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 178de30..bac0246 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             externalShufflePort,
             sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
               s"${sc.conf.getTimeAsSeconds("spark.network.timeout", 
"120s")}s"),
-            sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+            sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
         slave.shuffleRegistered = true
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to