Repository: spark Updated Branches: refs/heads/master 1e35e9693 -> 941b3f9ac
[SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs ## What changes were proposed in this pull request? As per rxin request, here are further API changes - Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event` - Changed the fields in `StreamingQueryListener.on***` from `query*` to `event` ## How was this patch tested? Existing unit tests. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #15530 from tdas/SPARK-17731-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/941b3f9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/941b3f9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/941b3f9a Branch: refs/heads/master Commit: 941b3f9aca59e62c078508a934f8c2221ced96ce Parents: 1e35e96 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Oct 18 17:32:16 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Oct 18 17:32:16 2016 -0700 ---------------------------------------------------------------------- project/MimaExcludes.scala | 9 +++++++++ .../sql/execution/streaming/StreamExecution.scala | 15 ++++++++------- .../streaming/StreamingQueryListenerBus.scala | 8 ++++---- .../spark/sql/streaming/StreamingQueryListener.scala | 14 +++++++------- .../org/apache/spark/sql/streaming/StreamTest.scala | 6 +++--- .../sql/streaming/StreamingQueryListenerSuite.scala | 13 +++++++------ .../spark/sql/streaming/StreamingQuerySuite.scala | 6 +++--- 7 files changed, 41 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1349af4..facf034 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,6 +68,15 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), // [SPARK-17338][SQL] add global temp view ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"), http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9144736..ba8cf80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,7 +165,7 @@ class StreamExecution( new Path(new Path(checkpointRoot), name).toUri.toString /** - * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event + * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. */ def start(): Unit = { @@ -177,9 +177,10 @@ class StreamExecution( /** * Repeatedly attempts to run batches as data arrives. * - * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted - * such that listeners are guaranteed to get a start event before a termination. Furthermore, this - * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns. + * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are + * posted such that listeners are guaranteed to get a start event before a termination. + * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the + * `start()` method returns. */ private def runBatches(): Unit = { try { @@ -190,7 +191,7 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } updateStatus() - postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception. + postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -232,7 +233,7 @@ class StreamExecution( // Update metrics and notify others streamMetrics.reportTriggerFinished() updateStatus() - postEvent(new QueryProgress(currentStatus)) + postEvent(new QueryProgressEvent(currentStatus)) isTerminated }) } catch { @@ -260,7 +261,7 @@ class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 1e66395..fc2190d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) */ def post(event: StreamingQueryListener.Event) { event match { - case s: QueryStarted => + case s: QueryStartedEvent => postToAll(s) case _ => sparkListenerBus.post(event) @@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) listener: StreamingQueryListener, event: StreamingQueryListener.Event): Unit = { event match { - case queryStarted: QueryStarted => + case queryStarted: QueryStartedEvent => listener.onQueryStarted(queryStarted) - case queryProgress: QueryProgress => + case queryProgress: QueryProgressEvent => listener.onQueryProgress(queryProgress) - case queryTerminated: QueryTerminated => + case queryTerminated: QueryTerminatedEvent => listener.onQueryTerminated(queryTerminated) case _ => } http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 69790e3..9e311fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -41,7 +41,7 @@ abstract class StreamingQueryListener { * don't block this method as it will block your query. * @since 2.0.0 */ - def onQueryStarted(queryStarted: QueryStarted): Unit + def onQueryStarted(event: QueryStartedEvent): Unit /** * Called when there is some status update (ingestion rate updated, etc.) @@ -49,16 +49,16 @@ abstract class StreamingQueryListener { * @note This method is asynchronous. The status in [[StreamingQuery]] will always be * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]] * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]] - * is terminated when you are processing [[QueryProgress]]. + * is terminated when you are processing [[QueryProgressEvent]]. * @since 2.0.0 */ - def onQueryProgress(queryProgress: QueryProgress): Unit + def onQueryProgress(event: QueryProgressEvent): Unit /** * Called when a query is stopped, with or without error. * @since 2.0.0 */ - def onQueryTerminated(queryTerminated: QueryTerminated): Unit + def onQueryTerminated(event: QueryTerminatedEvent): Unit } @@ -84,7 +84,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -92,7 +92,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -104,7 +104,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryTerminated private[sql]( + class QueryTerminatedEvent private[sql]( val queryStatus: StreamingQueryStatus, val exception: Option[String]) extends Event } http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8dfeb8d..7428330 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -684,20 +684,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } - override def onQueryStarted(queryStarted: QueryStarted): Unit = { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { asyncTestWaiter { startStatus = queryStarted.queryStatus } } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryProgress called before onQueryStarted") synchronized { progressStatuses += queryProgress.queryStatus } } } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryTerminated called before onQueryStarted") terminationStatus = queryTerminated.queryStatus http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 623f66a..ff84386 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStarted serialization") { - val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus) + val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryStarted] + .asInstanceOf[StreamingQueryListener.QueryStartedEvent] assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) } test("QueryProgress serialization") { - val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus) + val queryProcess = new StreamingQueryListener.QueryProgressEvent( + StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryProgress] + .asInstanceOf[StreamingQueryListener.QueryProgressEvent] assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) } test("QueryTerminated serialization") { val exception = new RuntimeException("exception") - val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( + val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( StreamingQueryStatus.testStatus, Some(exception.getMessage)) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryTerminated] + .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } http://git-wip-us.apache.org/repos/asf/spark/blob/941b3f9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9f8e2db..92020be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStarted): Unit = { } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { } + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } } try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org