This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a04de5  [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
2a04de5 is described below

commit 2a04de52dd7ea12ee6660ac1d385ba09617abf12
Author: Ajith <[email protected]>
AuthorDate: Thu Mar 14 09:16:29 2019 -0500

    [SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown
    
    ## What changes were proposed in this pull request?
    
    The race between org.apache.spark.deploy.DeployMessages.WorkDirCleanup 
event and  org.apache.spark.deploy.worker.Worker#onStop. Here its possible that 
while the WorkDirCleanup event is being processed, 
org.apache.spark.deploy.worker.Worker#cleanupThreadExecutor was shutdown. hence 
any submission after ThreadPoolExecutor will result in 
java.util.concurrent.RejectedExecutionException
    
    ## How was this patch tested?
    
    Manually
    
    Closes #24056 from ajithme/workercleanup.
    
    Authored-by: Ajith <[email protected]>
    Signed-off-by: Sean Owen <[email protected]>
---
 .../org/apache/spark/deploy/worker/Worker.scala    | 68 +++++++++++++---------
 1 file changed, 39 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4bd7aaa..52892c3 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -450,27 +450,32 @@ private[deploy] class Worker(
       // rpcEndpoint.
       // Copy ids so that it can be used in the cleanup thread.
       val appIds = (executors.values.map(_.appId) ++ 
drivers.values.map(_.driverId)).toSet
-      val cleanupFuture = concurrent.Future {
-        val appDirs = workDir.listFiles()
-        if (appDirs == null) {
-          throw new IOException("ERROR: Failed to list files in " + appDirs)
-        }
-        appDirs.filter { dir =>
-          // the directory is used by an application - check that the 
application is not running
-          // when cleaning up
-          val appIdFromDir = dir.getName
-          val isAppStillRunning = appIds.contains(appIdFromDir)
-          dir.isDirectory && !isAppStillRunning &&
-          !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
-        }.foreach { dir =>
-          logInfo(s"Removing directory: ${dir.getPath}")
-          Utils.deleteRecursively(dir)
-        }
-      }(cleanupThreadExecutor)
+      try {
+        val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
+          val appDirs = workDir.listFiles()
+          if (appDirs == null) {
+            throw new IOException("ERROR: Failed to list files in " + appDirs)
+          }
+          appDirs.filter { dir =>
+            // the directory is used by an application - check that the 
application is not running
+            // when cleaning up
+            val appIdFromDir = dir.getName
+            val isAppStillRunning = appIds.contains(appIdFromDir)
+            dir.isDirectory && !isAppStillRunning &&
+              !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
+          }.foreach { dir =>
+            logInfo(s"Removing directory: ${dir.getPath}")
+            Utils.deleteRecursively(dir)
+          }
+        }(cleanupThreadExecutor)
 
-      cleanupFuture.failed.foreach(e =>
-        logError("App dir cleanup failed: " + e.getMessage, e)
-      )(cleanupThreadExecutor)
+        cleanupFuture.failed.foreach(e =>
+          logError("App dir cleanup failed: " + e.getMessage, e)
+        )(cleanupThreadExecutor)
+      } catch {
+        case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown 
=>
+          logWarning("Failed to cleanup work dir as executor pool was 
shutdown")
+      }
 
     case MasterChanged(masterRef, masterWebUiUrl) =>
       logInfo("Master has changed, new master is at " + 
masterRef.address.toSparkURL)
@@ -634,15 +639,20 @@ private[deploy] class Worker(
     val shouldCleanup = finishedApps.contains(id) && 
!executors.values.exists(_.appId == id)
     if (shouldCleanup) {
       finishedApps -= id
-      appDirectories.remove(id).foreach { dirList =>
-        concurrent.Future {
-          logInfo(s"Cleaning up local directories for application $id")
-          dirList.foreach { dir =>
-            Utils.deleteRecursively(new File(dir))
-          }
-        }(cleanupThreadExecutor).failed.foreach(e =>
-          logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
-        )(cleanupThreadExecutor)
+      try {
+        appDirectories.remove(id).foreach { dirList =>
+          concurrent.Future {
+            logInfo(s"Cleaning up local directories for application $id")
+            dirList.foreach { dir =>
+              Utils.deleteRecursively(new File(dir))
+            }
+          }(cleanupThreadExecutor).failed.foreach(e =>
+            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+          )(cleanupThreadExecutor)
+        }
+      } catch {
+        case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown 
=>
+          logWarning("Failed to cleanup application as executor pool was 
shutdown")
       }
       shuffleService.applicationRemoved(id)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to