Repository: spark
Updated Branches:
  refs/heads/master 79e45c932 -> cf1d32e3e


[SPARK-1860] More conservative app directory cleanup.

First contribution to the project, so apologize for any significant errors.

This PR addresses [SPARK-1860]. The application directories are now cleaned up 
in a more conservative manner.

Previously, app-* directories were cleaned up if the directory's timestamp was 
older than a given time. However, the timestamp on a directory does not reflect 
the modification times of the files in that directory. Therefore, app-* 
directories were wiped out even if the files inside them were created recently 
and possibly being used by Executor tasks.

The solution is to change the cleanup logic to inspect all files within the 
app-* directory and only eliminate the app-* directory if all files in the 
directory are stale.

Author: mcheah <mch...@palantir.com>

Closes #2609 from mccheah/worker-better-app-dir-cleanup and squashes the 
following commits:

87b5d03 [mcheah] [SPARK-1860] Using more string interpolation. Better error 
logging.
802473e [mcheah] [SPARK-1860] Cleaning up the logs generated when cleaning 
directories.
e0a1f2e [mcheah] [SPARK-1860] Fixing broken unit test.
77a9de0 [mcheah] [SPARK-1860] More conservative app directory cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf1d32e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf1d32e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf1d32e3

Branch: refs/heads/master
Commit: cf1d32e3e1071829b152d4b597bf0a0d7a5629a2
Parents: 79e45c9
Author: mcheah <mch...@palantir.com>
Authored: Fri Oct 3 14:22:11 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Fri Oct 3 14:22:52 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/worker/ExecutorRunner.scala    |  8 +----
 .../org/apache/spark/deploy/worker/Worker.scala | 37 ++++++++++++++++----
 .../scala/org/apache/spark/util/Utils.scala     | 21 ++++++-----
 .../org/apache/spark/util/UtilsSuite.scala      | 23 ++++++++----
 4 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d32e3/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 00a4367..71650cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
     val workerId: String,
     val host: String,
     val sparkHome: File,
-    val workDir: File,
+    val executorDir: File,
     val workerUrl: String,
     val conf: SparkConf,
     var state: ExecutorState.Value)
@@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
    */
   def fetchAndRunExecutor() {
     try {
-      // Create the executor's working directory
-      val executorDir = new File(workDir, appId + "/" + execId)
-      if (!executorDir.mkdirs()) {
-        throw new IOException("Failed to create directory " + executorDir)
-      }
-
       // Launch the process
       val command = getCommandSeq
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d32e3/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0c454e4..3b13f43 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -18,15 +18,18 @@
 package org.apache.spark.deploy.worker
 
 import java.io.File
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import akka.actor._
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -191,6 +194,7 @@ private[spark] class Worker(
       changeMaster(masterUrl, masterWebUiUrl)
       context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, 
self, SendHeartbeat)
       if (CLEANUP_ENABLED) {
+        logInfo(s"Worker cleanup enabled; old application directories will be 
deleted in: $workDir")
         context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
           CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
       }
@@ -201,10 +205,23 @@ private[spark] class Worker(
     case WorkDirCleanup =>
       // Spin up a separate thread (in a future) to do the dir cleanup; don't 
tie up worker actor
       val cleanupFuture = concurrent.future {
-        logInfo("Cleaning up oldest application directories in " + workDir + " 
...")
-        Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
-          .foreach(Utils.deleteRecursively)
+        val appDirs = workDir.listFiles()
+        if (appDirs == null) {
+          throw new IOException("ERROR: Failed to list files in " + appDirs)
+        }
+        appDirs.filter { dir =>
+          // the directory is used by an application - check that the 
application is not running
+          // when cleaning up
+          val appIdFromDir = dir.getName
+          val isAppStillRunning = 
executors.values.map(_.appId).contains(appIdFromDir)
+          dir.isDirectory && !isAppStillRunning &&
+          !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
+        }.foreach { dir => 
+          logInfo(s"Removing directory: ${dir.getPath}")
+          Utils.deleteRecursively(dir)
+        }
       }
+
       cleanupFuture onFailure {
         case e: Throwable =>
           logError("App dir cleanup failed: " + e.getMessage, e)
@@ -233,8 +250,15 @@ private[spark] class Worker(
       } else {
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, 
execId, appDesc.name))
+
+          // Create the executor's working directory
+          val executorDir = new File(workDir, appId + "/" + execId)
+          if (!executorDir.mkdirs()) {
+            throw new IOException("Failed to create directory " + executorDir)
+          }
+
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, 
memory_,
-            self, workerId, host, sparkHome, workDir, akkaUrl, conf, 
ExecutorState.LOADING)
+            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, 
ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -242,12 +266,13 @@ private[spark] class Worker(
           master ! ExecutorStateChanged(appId, execId, manager.state, None, 
None)
         } catch {
           case e: Exception => {
-            logError("Failed to launch executor %s/%d for %s".format(appId, 
execId, appDesc.name))
+            logError(s"Failed to launch executor $appId/$execId for 
${appDesc.name}.", e)
             if (executors.contains(appId + "/" + execId)) {
               executors(appId + "/" + execId).kill()
               executors -= appId + "/" + execId
             }
-            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, 
None, None)
+            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
+              Some(e.toString), None)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d32e3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9399dda..a671241 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.log4j.PropertyConfigurator
@@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Finds all the files in a directory whose last modified time is older than 
cutoff seconds.
-   * @param dir  must be the path to a directory, or IllegalArgumentException 
is thrown
-   * @param cutoff measured in seconds. Files older than this are returned.
+   * Determines if a directory contains any files newer than cutoff seconds.
+   * 
+   * @param dir must be the path to a directory, or IllegalArgumentException 
is thrown
+   * @param cutoff measured in seconds. Returns true if there are any files in 
dir newer than this.
    */
-  def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+  def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
     val currentTimeMillis = System.currentTimeMillis
-    if (dir.isDirectory) {
-      val files = listFilesSafely(dir)
-      files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 
1000) }
+    if (!dir.isDirectory) {
+      throw new IllegalArgumentException (dir + " is not a directory!")
     } else {
-      throw new IllegalArgumentException(dir + " is not a directory!")
+      val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, 
TrueFileFilter.TRUE)
+      val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
+      val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
+      newFiles.nonEmpty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d32e3/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 70d423b..e63d9d0 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
     assert(Utils.getIteratorSize(iterator) === 5L)
   }
 
-  test("findOldFiles") {
+  test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
     val parent: File = Utils.createTempDir()
     val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The 
parent directory has two child directories
     val child2: File = Utils.createTempDir(parent.getCanonicalPath)
-    // set the last modified time of child1 to 10 secs old
-    child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+    val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+    // set the last modified time of child1 to 30 secs old
+    child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
 
-    val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
-    assert(result.size.equals(1))
-    assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+    // although child1 is old, child2 is still new so return true
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+    // although parent and its immediate children are new, child3 is still old
+    // we expect a full recursive search for new files.
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
   }
 
   test("resolveURI") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to