Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5e6394222 -> e5690a502


[SPARK-5783] Better eventlog-parsing error messages

Author: Ryan Williams <ryan.blake.willi...@gmail.com>

Closes #4573 from ryan-williams/history and squashes the following commits:

a8647ec [Ryan Williams] fix test calls to .replay()
98aa3fe [Ryan Williams] include filename in history-parsing error message
8deecf0 [Ryan Williams] add line number to history-parsing error message
b668b52 [Ryan Williams] add log info line to history-eventlog parsing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5690a50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5690a50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5690a50

Branch: refs/heads/branch-1.3
Commit: e5690a502f04ab948cc8f8f7fd04be55498ea0cc
Parents: 5e63942
Author: Ryan Williams <ryan.blake.willi...@gmail.com>
Authored: Fri Feb 13 09:47:26 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Feb 13 09:48:01 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 3 ++-
 .../main/scala/org/apache/spark/deploy/master/Master.scala  | 2 +-
 .../org/apache/spark/scheduler/ReplayListenerBus.scala      | 9 ++++++---
 .../org/apache/spark/scheduler/ReplayListenerSuite.scala    | 4 ++--
 4 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e5690a50/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 868c63d..885fa0f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -247,6 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    */
   private def replay(eventLog: FileStatus, bus: ReplayListenerBus): 
FsApplicationHistoryInfo = {
     val logPath = eventLog.getPath()
+    logInfo(s"Replaying log path: $logPath")
     val (logInput, sparkVersion) =
       if (isLegacyLogDirectory(eventLog)) {
         openLegacyEventLog(logPath)
@@ -256,7 +257,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     try {
       val appListener = new ApplicationEventListener
       bus.addListener(appListener)
-      bus.replay(logInput, sparkVersion)
+      bus.replay(logInput, sparkVersion, logPath.toString)
       new FsApplicationHistoryInfo(
         logPath.getName(),
         appListener.appId.getOrElse(logPath.getName()),

http://git-wip-us.apache.org/repos/asf/spark/blob/e5690a50/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 53e4539..8cc6ec1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -761,7 +761,7 @@ private[spark] class Master(
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new 
SecurityManager(conf),
         appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
       try {
-        replayBus.replay(logInput, sparkVersion)
+        replayBus.replay(logInput, sparkVersion, eventLogFile)
       } finally {
         logInput.close()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e5690a50/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 584f4e7..d9c3a10 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -40,21 +40,24 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
    *
    * @param logData Stream containing event log data.
    * @param version Spark version that generated the events.
+   * @param sourceName Filename (or other source identifier) from whence 
@logData is being read
    */
-  def replay(logData: InputStream, version: String) {
+  def replay(logData: InputStream, version: String, sourceName: String) {
     var currentLine: String = null
+    var lineNumber: Int = 1
     try {
       val lines = Source.fromInputStream(logData).getLines()
       lines.foreach { line =>
         currentLine = line
         postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+        lineNumber += 1
       }
     } catch {
       case ioe: IOException =>
         throw ioe
       case e: Exception =>
-        logError("Exception in parsing Spark event log.", e)
-        logError("Malformed line: %s\n".format(currentLine))
+        logError(s"Exception parsing Spark event log: $sourceName", e)
+        logError(s"Malformed line #$lineNumber: $currentLine\n")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e5690a50/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 7e360cc..702c4cb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, SPARK_VERSION)
+      replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
     } finally {
       logData.close()
     }
@@ -120,7 +120,7 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, version)
+      replayer.replay(logData, version, eventLog.getPath().toString)
     } finally {
       logData.close()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to