This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5aaec8b [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped 5aaec8b is described below commit 5aaec8bcb5602b903f337a21f4f0ad7e669844ad Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Thu Jan 28 13:06:42 2021 -0800 [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped ### What changes were proposed in this pull request? This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped. ### Why are the changes needed? Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message. However, when SparkContext is stopped, we don't need to re-register new block manager. Re-registration causes unnecessary executors' logs and and a delay on job termination. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. Closes #31373 from dongjoon-hyun/SPARK-34273. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit bc41c5a0e598e6b697ed61c33e1bea629dabfc57) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 8 +++++--- .../org/apache/spark/HeartbeatReceiverSuite.scala | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index be63072..6c18cf1 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -128,6 +128,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // Messages received from executors case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) => + var reregisterBlockManager = !sc.isStopped if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() @@ -135,7 +136,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, accumUpdates, blockManagerId, executorUpdates) - val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) + reregisterBlockManager &= unknownExecutor + val response = HeartbeatResponse(reregisterBlockManager) context.reply(response) } }) @@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") - context.reply(HeartbeatResponse(reregisterBlockManager = true)) + context.reply(HeartbeatResponse(reregisterBlockManager)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") - context.reply(HeartbeatResponse(reregisterBlockManager = true)) + context.reply(HeartbeatResponse(reregisterBlockManager)) } } diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index ff0f2f9..9e7bc03 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -218,6 +218,24 @@ class HeartbeatReceiverSuite fakeSchedulerBackend.stop() } + test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") { + val blockManagerId = BlockManagerId(executorId1, "localhost", 12345) + + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( + Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty)) + assert(response.reregisterBlockManager) + + try { + sc.stopped.set(true) + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( + Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty)) + assert(!response.reregisterBlockManager) + } finally { + sc.stopped.set(false) + } + } + /** Manually send a heartbeat and return the response. */ private def triggerHeartbeat( executorId: String, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org