Repository: spark
Updated Branches:
  refs/heads/master 37bed97de -> 23e047f46


[SPARK-16416][CORE] force eager creation of loggers to avoid shutdown hook 
conflicts

## What changes were proposed in this pull request?

Force eager creation of loggers to avoid shutdown hook conflicts.

## How was this patch tested?

Manually tested with a project using Log4j 2, verified that the shutdown hook 
conflict issue was solved.

Author: Mikael StÃ¥ldal <[email protected]>

Closes #14320 from mikaelstaldal/shutdown-hook-logging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23e047f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23e047f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23e047f4

Branch: refs/heads/master
Commit: 23e047f4609bf39f50ea4c65f704cac15408a821
Parents: 37bed97
Author: Mikael StÃ¥ldal <[email protected]>
Authored: Sun Jul 24 11:16:24 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Sun Jul 24 11:16:24 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala        | 2 ++
 core/src/main/scala/org/apache/spark/SparkContext.scala            | 1 +
 .../scala/org/apache/spark/deploy/ExternalShuffleService.scala     | 1 +
 .../org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala     | 1 +
 .../scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala | 2 ++
 .../src/main/scala/org/apache/spark/storage/DiskBlockManager.scala | 1 +
 .../src/main/scala/org/apache/spark/util/ShutdownHookManager.scala | 1 +
 .../main/scala/org/apache/spark/streaming/StreamingContext.scala   | 1 +
 8 files changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6bd9502..486d535 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -46,6 +46,8 @@ private[spark] class MapOutputTrackerMasterEndpoint(
     override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: 
SparkConf)
   extends RpcEndpoint with Logging {
 
+  logDebug("init") // force eager creation of logger
+
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
     case GetMapOutputStatuses(shuffleId: Int) =>
       val hostPort = context.senderAddress.hostPort

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 57d1f09..6d7f05d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -556,6 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
     // is killed, though.
+    logDebug("Adding shutdown hook") // force eager creation of logger
     _shutdownHookRef = ShutdownHookManager.addShutdownHook(
       ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
       logInfo("Invoking stop() from shutdown hook")

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 37a19a4..13eadbe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -128,6 +128,7 @@ object ExternalShuffleService extends Logging {
     server = newShuffleService(sparkConf, securityManager)
     server.start()
 
+    logDebug("Adding shutdown hook") // force eager creation of logger
     ShutdownHookManager.addShutdownHook { () =>
       logInfo("Shutting down shuffle service.")
       server.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index a057977..73b6ca3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -104,6 +104,7 @@ private[mesos] object MesosClusterDispatcher extends 
Logging {
     }
     val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
     dispatcher.start()
+    logDebug("Adding shutdown hook") // force eager creation of logger
     ShutdownHookManager.addShutdownHook { () =>
       logInfo("Shutdown hook is shutting down dispatcher")
       dispatcher.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 2dd453c..7bed685 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -184,6 +184,8 @@ private[spark] object OutputCommitCoordinator {
       override val rpcEnv: RpcEnv, outputCommitCoordinator: 
OutputCommitCoordinator)
     extends RpcEndpoint with Logging {
 
+    logDebug("init") // force eager creation of logger
+
     override def receive: PartialFunction[Any, Unit] = {
       case StopCoordinator =>
         logInfo("OutputCommitCoordinator stopped!")

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 0666be2..3d43e3c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -141,6 +141,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   }
 
   private def addShutdownHook(): AnyRef = {
+    logDebug("Adding shutdown hook") // force eager creation of logger
     
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY
 + 1) { () =>
       logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 93ac67e..4001fac 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -54,6 +54,7 @@ private[spark] object ShutdownHookManager extends Logging {
   private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
 
   // Add a shutdown hook to delete the temp dirs when the JVM exits
+  logDebug("Adding shutdown hook") // force eager creation of logger
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
     logInfo("Shutdown hook called")
     // we need to materialize the paths to delete because deleteRecursively 
removes items from

http://git-wip-us.apache.org/repos/asf/spark/blob/23e047f4/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6046426..4808d0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -592,6 +592,7 @@ class StreamingContext private[streaming] (
           }
           StreamingContext.setActiveContext(this)
         }
+        logDebug("Adding shutdown hook") // force eager creation of logger
         shutdownHookRef = ShutdownHookManager.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         // Registering Streaming Metrics at the start of the StreamingContext


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to