Repository: spark
Updated Branches:
  refs/heads/master 7c3786929 -> d2923f173


[SPARK-18143][SQL] Ignore Structured Streaming event logs to avoid breaking 
history server

## What changes were proposed in this pull request?

Because of the refactoring work in Structured Streaming, the event logs 
generated by Strucutred Streaming in Spark 2.0.0 and 2.0.1 cannot be parsed.

This PR just ignores these logs in ReplayListenerBus because no places use them.
## How was this patch tested?
- Generated events logs using Spark 2.0.0 and 2.0.1, and saved them as 
`structured-streaming-query-event-logs-2.0.0.txt` and 
`structured-streaming-query-event-logs-2.0.1.txt`
- The new added test makes sure ReplayListenerBus will skip these bad jsons.

Author: Shixiong Zhu <[email protected]>

Closes #15663 from zsxwing/fix-event-log.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2923f17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2923f17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2923f17

Branch: refs/heads/master
Commit: d2923f173265b66a4ec71c3c86ff71a58d5aeb3d
Parents: 7c37869
Author: Shixiong Zhu <[email protected]>
Authored: Mon Oct 31 00:11:33 2016 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon Oct 31 00:11:33 2016 -0700

----------------------------------------------------------------------
 .../spark/scheduler/ReplayListenerBus.scala     | 13 ++++++
 .../query-event-logs-version-2.0.0.txt          |  4 ++
 .../query-event-logs-version-2.0.1.txt          |  4 ++
 .../streaming/StreamingQueryListenerSuite.scala | 42 ++++++++++++++++++++
 4 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2923f17/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 3eff8d9..2424586 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -72,6 +72,10 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
 
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
+          case e: ClassNotFoundException if 
KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
+            // Ignore events generated by Structured Streaming in Spark 2.0.0 
and 2.0.1.
+            // It's safe since no place uses them.
+            logWarning(s"Dropped incompatible Structured Streaming log: 
$currentLine")
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that 
might be truncated
             // the last entry may not be the very last line in the event log, 
but we treat it
@@ -102,4 +106,13 @@ private[spark] object ReplayListenerBus {
 
   // utility filter that selects all event logs during replay
   val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+
+  /**
+   * Classes that were removed. Structured Streaming doesn't use them any 
more. However, parsing
+   * old json may fail and we can just ignore these failures.
+   */
+  val KNOWN_REMOVED_CLASSES = Set(
+    "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
+    "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
+  )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d2923f17/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
 
b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
new file mode 100644
index 0000000..aa7e9a8
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@514502dc","offsetDesc":"[-]"}},"exception":"Query
 hello terminated with exception: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 
0, localhost): java.lang.ArithmeticException: / by zero\n\tat 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)\n\tat 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat
 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$an
 on$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat 
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat 
org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
 a:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver 
stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477593059313}

http://git-wip-us.apache.org/repos/asf/spark/blob/d2923f17/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
 
b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
new file mode 100644
index 0000000..646cf10
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}},"exception":null}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@70c61dc8","offsetDesc":"[-]"}},"exception":"org.apache.spark.SparkException:
 Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): 
java.lang.ArithmeticException: / by zero\n\tat 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)\n\tat 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat
 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.h
 asNext(WholeStageCodegenExec.scala:370)\n\tat 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat 
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat 
org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 \n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat
 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat
 scala.Option.foreach(Option.scala:257)\n\tat 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\t
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat
 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat
 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat
 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat 
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat
 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat
 org.apache.spark.rdd.RDD.withScope(RDD.sc
 ala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)\n\tat
 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)\n\tat
 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)\n\tat
 org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)\n\tat 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)\n\tat
 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat
 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat
 org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)\n\tat 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2197)\n\tat
 org.apache.spark.sql.Dataset.collect(Dataset.scala:2173)\n\tat 
org.apache.spark.sql.execut
 ion.streaming.MemorySink.addBatch(memory.scala:154)\n\tat 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:366)\n\tat
 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)\n\tat
 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)\n\tat
 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)\n\tat
 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)\nCaused
 by: java.lang.ArithmeticException: / by zero\n\tat 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 
$line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat
 org.apache.spar
 
k.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown 
Source)\n\tat 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat
 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat
 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat
 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat 
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat 
org.apache.spark.scheduler.Task.run(T
 ask.scala:86)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat
 java.lang.Thread.run(Thread.java:745)\n"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477701734609}

http://git-wip-us.apache.org/repos/asf/spark/blob/d2923f17/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index ff84386..cebb32a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.streaming
 
+import scala.collection.mutable
+
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester._
 
 import org.apache.spark.SparkException
+import org.apache.spark.scheduler._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
@@ -206,6 +209,45 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }
 
+  test("ReplayListenerBus should ignore broken event jsons generated in 
2.0.0") {
+    // query-event-logs-version-2.0.0.txt has all types of events generated by
+    // Structured Streaming in Spark 2.0.0.
+    // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
+    // to verify that we can skip broken jsons generated by Structured 
Streaming.
+    
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
+  }
+
+  test("ReplayListenerBus should ignore broken event jsons generated in 
2.0.1") {
+    // query-event-logs-version-2.0.1.txt has all types of events generated by
+    // Structured Streaming in Spark 2.0.1.
+    // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
+    // to verify that we can skip broken jsons generated by Structured 
Streaming.
+    
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
+  }
+
+  private def testReplayListenerBusWithBorkenEventJsons(fileName: String): 
Unit = {
+    val input = 
getClass.getResourceAsStream(s"/structured-streaming/$fileName")
+    val events = mutable.ArrayBuffer[SparkListenerEvent]()
+    try {
+      val replayer = new ReplayListenerBus() {
+        // Redirect all parsed events to `events`
+        override def doPostEvent(
+            listener: SparkListenerInterface,
+            event: SparkListenerEvent): Unit = {
+          events += event
+        }
+      }
+      // Add a dummy listener so that "doPostEvent" will be called.
+      replayer.addListener(new SparkListener {})
+      replayer.replay(input, fileName)
+      // SparkListenerApplicationEnd is the only valid event
+      assert(events.size === 1)
+      assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
+    } finally {
+      input.close()
+    }
+  }
+
   private def assertStreamingQueryInfoEquals(
       expected: StreamingQueryStatus,
       actual: StreamingQueryStatus): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to