Repository: spark
Updated Branches:
  refs/heads/master 1e35e9693 -> 941b3f9ac


[SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs

## What changes were proposed in this pull request?

As per rxin request, here are further API changes
- Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event`
- Changed the fields in `StreamingQueryListener.on***` from `query*` to `event`

## How was this patch tested?
Existing unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15530 from tdas/SPARK-17731-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/941b3f9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/941b3f9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/941b3f9a

Branch: refs/heads/master
Commit: 941b3f9aca59e62c078508a934f8c2221ced96ce
Parents: 1e35e96
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Oct 18 17:32:16 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 18 17:32:16 2016 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                           |  9 +++++++++
 .../sql/execution/streaming/StreamExecution.scala    | 15 ++++++++-------
 .../streaming/StreamingQueryListenerBus.scala        |  8 ++++----
 .../spark/sql/streaming/StreamingQueryListener.scala | 14 +++++++-------
 .../org/apache/spark/sql/streaming/StreamTest.scala  |  6 +++---
 .../sql/streaming/StreamingQueryListenerSuite.scala  | 13 +++++++------
 .../spark/sql/streaming/StreamingQuerySuite.scala    |  6 +++---
 7 files changed, 41 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1349af4..facf034 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,6 +68,15 @@ object MimaExcludes {
       
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
 
       // [SPARK-17338][SQL] add global temp view
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9144736..ba8cf80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -165,7 +165,7 @@ class StreamExecution(
     new Path(new Path(checkpointRoot), name).toUri.toString
 
   /**
-   * Starts the execution. This returns only after the thread has started and 
[[QueryStarted]] event
+   * Starts the execution. This returns only after the thread has started and 
[[QueryStartedEvent]]
    * has been posted to all the listeners.
    */
   def start(): Unit = {
@@ -177,9 +177,10 @@ class StreamExecution(
   /**
    * Repeatedly attempts to run batches as data arrives.
    *
-   * Note that this method ensures that [[QueryStarted]] and 
[[QueryTerminated]] events are posted
-   * such that listeners are guaranteed to get a start event before a 
termination. Furthermore, this
-   * method also ensures that [[QueryStarted]] event is posted before the 
`start()` method returns.
+   * Note that this method ensures that [[QueryStartedEvent]] and 
[[QueryTerminatedEvent]] are
+   * posted such that listeners are guaranteed to get a start event before a 
termination.
+   * Furthermore, this method also ensures that [[QueryStartedEvent]] event is 
posted before the
+   * `start()` method returns.
    */
   private def runBatches(): Unit = {
     try {
@@ -190,7 +191,7 @@ class StreamExecution(
         
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
       updateStatus()
-      postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw 
exception.
+      postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not 
throw exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -232,7 +233,7 @@ class StreamExecution(
         // Update metrics and notify others
         streamMetrics.reportTriggerFinished()
         updateStatus()
-        postEvent(new QueryProgress(currentStatus))
+        postEvent(new QueryProgressEvent(currentStatus))
         isTerminated
       })
     } catch {
@@ -260,7 +261,7 @@ class StreamExecution(
       // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
       postEvent(
-        new QueryTerminated(currentStatus, 
exception.map(_.cause).map(Utils.exceptionString)))
+        new QueryTerminatedEvent(currentStatus, 
exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 1e66395..fc2190d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
    */
   def post(event: StreamingQueryListener.Event) {
     event match {
-      case s: QueryStarted =>
+      case s: QueryStartedEvent =>
         postToAll(s)
       case _ =>
         sparkListenerBus.post(event)
@@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
       listener: StreamingQueryListener,
       event: StreamingQueryListener.Event): Unit = {
     event match {
-      case queryStarted: QueryStarted =>
+      case queryStarted: QueryStartedEvent =>
         listener.onQueryStarted(queryStarted)
-      case queryProgress: QueryProgress =>
+      case queryProgress: QueryProgressEvent =>
         listener.onQueryProgress(queryProgress)
-      case queryTerminated: QueryTerminated =>
+      case queryTerminated: QueryTerminatedEvent =>
         listener.onQueryTerminated(queryTerminated)
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 69790e3..9e311fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -41,7 +41,7 @@ abstract class StreamingQueryListener {
    *       don't block this method as it will block your query.
    * @since 2.0.0
    */
-  def onQueryStarted(queryStarted: QueryStarted): Unit
+  def onQueryStarted(event: QueryStartedEvent): Unit
 
   /**
    * Called when there is some status update (ingestion rate updated, etc.)
@@ -49,16 +49,16 @@ abstract class StreamingQueryListener {
    * @note This method is asynchronous. The status in [[StreamingQuery]] will 
always be
    *       latest no matter when this method is called. Therefore, the status 
of [[StreamingQuery]]
    *       may be changed before/when you process the event. E.g., you may 
find [[StreamingQuery]]
-   *       is terminated when you are processing [[QueryProgress]].
+   *       is terminated when you are processing [[QueryProgressEvent]].
    * @since 2.0.0
    */
-  def onQueryProgress(queryProgress: QueryProgress): Unit
+  def onQueryProgress(event: QueryProgressEvent): Unit
 
   /**
    * Called when a query is stopped, with or without error.
    * @since 2.0.0
    */
-  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
+  def onQueryTerminated(event: QueryTerminatedEvent): Unit
 }
 
 
@@ -84,7 +84,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
+  class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
 
   /**
    * :: Experimental ::
@@ -92,7 +92,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
+  class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
 
   /**
    * :: Experimental ::
@@ -104,7 +104,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryTerminated private[sql](
+  class QueryTerminatedEvent private[sql](
       val queryStatus: StreamingQueryStatus,
       val exception: Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 8dfeb8d..7428330 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -684,20 +684,20 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
 
 
-    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
       asyncTestWaiter {
         startStatus = queryStarted.queryStatus
       }
     }
 
-    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
         synchronized { progressStatuses += queryProgress.queryStatus }
       }
     }
 
-    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
         terminationStatus = queryTerminated.queryStatus

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 623f66a..ff84386 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("QueryStarted serialization") {
-    val queryStarted = new 
StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
+    val queryStarted = new 
StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryStarted)
     val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryStarted]
+      .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
     assertStreamingQueryInfoEquals(queryStarted.queryStatus, 
newQueryStarted.queryStatus)
   }
 
   test("QueryProgress serialization") {
-    val queryProcess = new 
StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
+    val queryProcess = new StreamingQueryListener.QueryProgressEvent(
+      StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryProcess)
     val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryProgress]
+      .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
     assertStreamingQueryInfoEquals(queryProcess.queryStatus, 
newQueryProcess.queryStatus)
   }
 
   test("QueryTerminated serialization") {
     val exception = new RuntimeException("exception")
-    val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
+    val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
       StreamingQueryStatus.testStatus,
       Some(exception.getMessage))
     val json =
       JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryTerminated]
+      .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
     assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, 
newQueryTerminated.queryStatus)
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9f8e2db..92020be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     // A StreamingQueryListener that gets the query status after the first 
completed trigger
     val listener = new StreamingQueryListener {
       @volatile var firstStatus: StreamingQueryStatus = null
-      override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
-      override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        if (firstStatus == null) firstStatus = queryProgress.queryStatus
       }
-      override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = 
{ }
+      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = { }
     }
 
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to