Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a490787da -> 0c67993cf


[SPARK-9844][CORE] File appender race condition during shutdown

When an Executor process is destroyed, the FileAppender that is asynchronously 
reading the stderr stream of the process can throw an IOException during read 
because the stream is closed.  Before the ExecutorRunner destroys the process, 
the FileAppender thread is flagged to stop.  This PR wraps the inputStream.read 
call of the FileAppender in a try/catch block so that if an IOException is 
thrown and the thread has been flagged to stop, it will safely ignore the 
exception.  Additionally, the FileAppender thread was changed to use 
Utils.tryWithSafeFinally to better log any exception that do occur.  Added unit 
tests to verify a IOException is thrown and logged if FileAppender is not 
flagged to stop, and that no IOException when the flag is set.

Author: Bryan Cutler <cutl...@gmail.com>

Closes #10714 from BryanCutler/file-appender-read-ioexception-SPARK-9844.

(cherry picked from commit 56cdbd654d54bf07a063a03a5c34c4165818eeb2)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c67993c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c67993c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c67993c

Branch: refs/heads/branch-1.6
Commit: 0c67993cf25c681611c55fd056808beee048129b
Parents: a490787
Author: Bryan Cutler <cutl...@gmail.com>
Authored: Thu Jan 14 10:59:02 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jan 14 11:05:09 2016 +0000

----------------------------------------------------------------------
 .../spark/util/logging/FileAppender.scala       | 36 +++++----
 .../apache/spark/util/FileAppenderSuite.scala   | 80 +++++++++++++++++++-
 2 files changed, 100 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c67993c/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 14b6ba4..9ddd02f 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.logging
 
-import java.io.{File, FileOutputStream, InputStream}
+import java.io.{File, FileOutputStream, InputStream, IOException}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.{IntParam, Utils}
@@ -63,24 +63,32 @@ private[spark] class FileAppender(inputStream: InputStream, 
file: File, bufferSi
   protected def appendStreamToFile() {
     try {
       logDebug("Started appending thread")
-      openFile()
-      val buf = new Array[Byte](bufferSize)
-      var n = 0
-      while (!markedForStop && n != -1) {
-        n = inputStream.read(buf)
-        if (n != -1) {
-          appendToFile(buf, n)
+      Utils.tryWithSafeFinally {
+        openFile()
+        val buf = new Array[Byte](bufferSize)
+        var n = 0
+        while (!markedForStop && n != -1) {
+          try {
+            n = inputStream.read(buf)
+          } catch {
+            // An InputStream can throw IOException during read if the stream 
is closed
+            // asynchronously, so once appender has been flagged to stop these 
will be ignored
+            case _: IOException if markedForStop =>  // do nothing and proceed 
to stop appending
+          }
+          if (n > 0) {
+            appendToFile(buf, n)
+          }
+        }
+      } {
+        closeFile()
+        synchronized {
+          stopped = true
+          notifyAll()
         }
       }
     } catch {
       case e: Exception =>
         logError(s"Error writing stream to file $file", e)
-    } finally {
-      closeFile()
-      synchronized {
-        stopped = true
-        notifyAll()
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c67993c/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 2b76ae1..5a14fc7 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,14 +18,18 @@
 package org.apache.spark.util
 
 import java.io._
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable.HashSet
 import scala.reflect._
 
-import org.scalatest.BeforeAndAfter
-
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
+import org.apache.log4j.{Appender, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{atLeast, mock, verify}
+import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 import org.apache.spark.util.logging.{RollingFileAppender, 
SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
@@ -189,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
   }
 
+  test("file appender async close stream abruptly") {
+    // Test FileAppender reaction to closing InputStream using a mock logging 
appender
+    val mockAppender = mock(classOf[Appender])
+    val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+    // Make sure only logging errors
+    val logger = Logger.getRootLogger
+    logger.setLevel(Level.ERROR)
+    logger.addAppender(mockAppender)
+
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream)
+
+    // Close the stream before appender tries to read will cause an IOException
+    testInputStream.close()
+    testOutputStream.close()
+    val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+    appender.awaitTermination()
+
+    // If InputStream was closed without first stopping the appender, an 
exception will be logged
+    verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
+    val loggingEvent = loggingEventCaptor.getValue
+    assert(loggingEvent.getThrowableInformation !== null)
+    
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+  }
+
+  test("file appender async close stream gracefully") {
+    // Test FileAppender reaction to closing InputStream using a mock logging 
appender
+    val mockAppender = mock(classOf[Appender])
+    val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+    // Make sure only logging errors
+    val logger = Logger.getRootLogger
+    logger.setLevel(Level.ERROR)
+    logger.addAppender(mockAppender)
+
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream) with 
LatchedInputStream
+
+    // Close the stream before appender tries to read will cause an IOException
+    testInputStream.close()
+    testOutputStream.close()
+    val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+    // Stop the appender before an IOException is called during read
+    testInputStream.latchReadStarted.await()
+    appender.stop()
+    testInputStream.latchReadProceed.countDown()
+
+    appender.awaitTermination()
+
+    // Make sure no IOException errors have been logged as a result of 
appender closing gracefully
+    verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
+    import scala.collection.JavaConverters._
+    loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
+      assert(loggingEvent.getThrowableInformation === null
+        || 
!loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+    }
+  }
+
   /**
    * Run the rolling file appender with data and see whether all the data was 
written correctly
    * across rolled over files.
@@ -229,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
       file.getName.startsWith(testFile.getName)
     }.foreach { _.delete() }
   }
+
+  /** Used to synchronize when read is called on a stream */
+  private trait LatchedInputStream extends PipedInputStream {
+    val latchReadStarted = new CountDownLatch(1)
+    val latchReadProceed = new CountDownLatch(1)
+    abstract override def read(): Int = {
+      latchReadStarted.countDown()
+      latchReadProceed.await()
+      super.read()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to