Repository: spark
Updated Branches:
  refs/heads/master 8de080d9f -> 06a979379


[SPARK-21447][WEB UI] Spark history server fails to render compressed

inprogress history file in some cases.

Add failure handling for EOFException that can be thrown during
decompression of an inprogress spark history file, treat same as case
where can't parse the last line.

## What changes were proposed in this pull request?

Failure handling for case of EOFException thrown within the 
ReplayListenerBus.replay method to handle the case analogous to json parse fail 
case.  This path can arise in compressed inprogress history files since an 
incomplete compression block could be read (not flushed by writer on a block 
boundary).  See the stack trace of this occurrence in the jira ticket 
(https://issues.apache.org/jira/browse/SPARK-21447)

## How was this patch tested?

Added a unit test that specifically targets validating the failure handling 
path appropriately when maybeTruncated is true and false.

Author: Eric Vandenberg <[email protected]>

Closes #18673 from ericvandenbergfb/fix_inprogress_compr_history_file.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06a97937
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06a97937
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06a97937

Branch: refs/heads/master
Commit: 06a9793793ca41dcef2f10ca06af091a57c721c4
Parents: 8de080d
Author: Eric Vandenberg <[email protected]>
Authored: Tue Jul 25 11:45:35 2017 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Jul 25 11:45:35 2017 -0700

----------------------------------------------------------------------
 .../spark/scheduler/ReplayListenerBus.scala     |  3 +-
 .../spark/scheduler/ReplayListenerSuite.scala   | 77 +++++++++++++++++++-
 2 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06a97937/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 08e05ae..26a6a3e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{InputStream, IOException}
+import java.io.{EOFException, InputStream, IOException}
 
 import scala.io.Source
 
@@ -107,6 +107,7 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
         }
       }
     } catch {
+      case _: EOFException if maybeTruncated =>
       case ioe: IOException =>
         throw ioe
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/06a97937/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 1732aca..88a68af 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{File, PrintWriter}
+import java.io._
 import java.net.URI
+import java.util.concurrent.atomic.AtomicInteger
 
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
+import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
 import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
 
 /**
@@ -72,6 +73,59 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
     assert(eventMonster.loggedEvents(1) === 
JsonProtocol.sparkEventToJson(applicationEnd))
   }
 
+  /**
+   * Test replaying compressed spark history file that internally throws an 
EOFException.  To
+   * avoid sensitivity to the compression specifics the test forces an 
EOFException to occur
+   * while reading bytes from the underlying stream (such as observed in 
actual history files
+   * in some cases) and forces specific failure handling.  This validates 
correctness in both
+   * cases when maybeTruncated is true or false.
+   */
+  test("Replay compressed inprogress log file succeeding on partial read") {
+    val buffered = new ByteArrayOutputStream
+    val codec = new LZ4CompressionCodec(new SparkConf())
+    val compstream = codec.compressedOutputStream(buffered)
+    val writer = new PrintWriter(compstream)
+
+    val applicationStart = SparkListenerApplicationStart("AppStarts", None,
+      125L, "Mickey", None)
+    val applicationEnd = SparkListenerApplicationEnd(1000L)
+
+    // scalastyle:off println
+    
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+    
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+    // scalastyle:on println
+    writer.close()
+
+    val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
+    val fstream = fileSystem.create(logFilePath)
+    val bytes = buffered.toByteArray
+
+    fstream.write(bytes, 0, buffered.size)
+    fstream.close
+
+    // Read the compressed .inprogress file and verify only first event was 
parsed.
+    val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
+    val replayer = new ReplayListenerBus()
+
+    val eventMonster = new EventMonster(conf)
+    replayer.addListener(eventMonster)
+
+    // Verify the replay returns the events given the input maybe truncated.
+    val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem)
+    val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10)
+    replayer.replay(failingStream, logFilePath.toString, true)
+
+    assert(eventMonster.loggedEvents.size === 1)
+    assert(failingStream.didFail)
+
+    // Verify the replay throws the EOF exception since the input may not be 
truncated.
+    val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem)
+    val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10)
+    intercept[EOFException] {
+      replayer.replay(failingStream2, logFilePath.toString, false)
+    }
+  }
+
   // This assumes the correctness of EventLoggingListener
   test("End-to-end replay") {
     testApplicationReplay()
@@ -156,4 +210,23 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
     override def start() { }
 
   }
+
+  /*
+   * This is a dummy input stream that wraps another input stream but ends 
prematurely when
+   * reading at the specified position, throwing an EOFExeption.
+   */
+  private class EarlyEOFInputStream(in: InputStream, failAtPos: Int) extends 
InputStream {
+    private val countDown = new AtomicInteger(failAtPos)
+
+    def didFail: Boolean = countDown.get == 0
+
+    @throws[IOException]
+    def read: Int = {
+      if (countDown.get == 0) {
+        throw new EOFException("Stream ended prematurely")
+      }
+      countDown.decrementAndGet()
+      in.read
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to