Repository: spark Updated Branches: refs/heads/branch-2.1 85dd07374 -> 4c4bf87ac
[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent ## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCat <zhunans...@gmail.com> Closes #15675 from CodingCat/SPARK-18144. (cherry picked from commit 85c5424d466f4a5765c825e0e2ab30da97611285) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c4bf87a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c4bf87a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c4bf87a Branch: refs/heads/branch-2.1 Commit: 4c4bf87acf2516a72b59f4e760413f80640dca1e Parents: 85dd0737 Author: CodingCat <zhunans...@gmail.com> Authored: Tue Nov 1 23:39:53 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Nov 1 23:40:00 2016 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamingQueryListenerBus.scala | 10 +++++++++- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index fc2190d..22e4c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) def post(event: StreamingQueryListener.Event) { event match { case s: QueryStartedEvent => + sparkListenerBus.post(s) + // post to local listeners to trigger callbacks postToAll(s) case _ => sparkListenerBus.post(event) @@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: StreamingQueryListener.Event => - postToAll(e) + // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus + // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, + // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus + // thread + if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { + postToAll(e) + } case _ => } } http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 464c443..31b7fe0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + @volatile var queryStartedEvent = 0 + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + queryStartedEvent += 1 + } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { q.processAllAvailable() eventually(timeout(streamingTimeout)) { assert(listener.firstStatus != null) + // test if QueryStartedEvent callback is called for only once + assert(listener.queryStartedEvent === 1) } listener.firstStatus } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org