[SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery Renamed for simplicity, so that its obvious that its related to streaming.
Existing unit tests. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #13673 from tdas/SPARK-15953. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a507199 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a507199 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a507199 Branch: refs/heads/master Commit: 9a5071996b968148f6b9aba12e0d3fe888d9acd8 Parents: d30b7e6 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Jun 15 10:46:02 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jun 15 10:46:07 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/context.py | 8 +- python/pyspark/sql/dataframe.py | 2 +- python/pyspark/sql/readwriter.py | 40 +-- python/pyspark/sql/session.py | 10 +- python/pyspark/sql/streaming.py | 79 +++-- python/pyspark/sql/tests.py | 52 ++-- python/pyspark/sql/utils.py | 8 +- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../org/apache/spark/sql/ForeachWriter.scala | 4 +- .../scala/org/apache/spark/sql/SQLContext.scala | 8 +- .../org/apache/spark/sql/SparkSession.scala | 6 +- .../spark/sql/execution/SQLExecution.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../streaming/ContinuousQueryListenerBus.scala | 72 ----- .../execution/streaming/StreamExecution.scala | 26 +- .../streaming/StreamingQueryListenerBus.scala | 72 +++++ .../scala/org/apache/spark/sql/functions.scala | 6 +- .../org/apache/spark/sql/internal/SQLConf.scala | 4 +- .../spark/sql/internal/SessionState.scala | 8 +- .../spark/sql/streaming/ContinuousQuery.scala | 119 -------- .../streaming/ContinuousQueryException.scala | 54 ---- .../sql/streaming/ContinuousQueryInfo.scala | 37 --- .../sql/streaming/ContinuousQueryListener.scala | 113 ------- .../sql/streaming/ContinuousQueryManager.scala | 279 ----------------- .../spark/sql/streaming/DataStreamWriter.scala | 16 +- .../spark/sql/streaming/StreamingQuery.scala | 119 ++++++++ .../sql/streaming/StreamingQueryException.scala | 54 ++++ .../sql/streaming/StreamingQueryInfo.scala | 37 +++ .../sql/streaming/StreamingQueryListener.scala | 113 +++++++ .../sql/streaming/StreamingQueryManager.scala | 279 +++++++++++++++++ .../apache/spark/sql/streaming/Trigger.scala | 4 +- .../ContinuousQueryListenerSuite.scala | 304 ------------------- .../streaming/ContinuousQueryManagerSuite.scala | 299 ------------------ .../sql/streaming/ContinuousQuerySuite.scala | 180 ----------- .../sql/streaming/FileStreamSinkSuite.scala | 6 +- .../spark/sql/streaming/FileStressSuite.scala | 6 +- .../apache/spark/sql/streaming/StreamTest.scala | 4 +- .../streaming/StreamingAggregationSuite.scala | 2 - .../streaming/StreamingQueryListenerSuite.scala | 304 +++++++++++++++++++ .../streaming/StreamingQueryManagerSuite.scala | 299 ++++++++++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 180 +++++++++++ .../test/DataStreamReaderWriterSuite.scala | 10 +- 42 files changed, 1615 insertions(+), 1618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index a271afe..8a1a874 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -444,13 +444,13 @@ class SQLContext(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._ssql_ctx.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._ssql_ctx.streams()) class HiveContext(SQLContext): http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0126faf..acf9d08 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,7 +257,7 @@ class DataFrame(object): def isStreaming(self): """Returns true if this :class:`Dataset` contains one or more sources that continuously return data as it arrives. A :class:`Dataset` that reads data from a streaming source - must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in + must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in :class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ad954d0..c982de6 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] def to_str(value): @@ -458,9 +458,9 @@ class DataFrameWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.write() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(1.4) def mode(self, saveMode): @@ -1094,9 +1094,9 @@ class DataStreamWriter(object): self._spark = df.sql_ctx self._jwrite = df._jdf.writeStream() - def _cq(self, jcq): - from pyspark.sql.streaming import ContinuousQuery - return ContinuousQuery(jcq) + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) @since(2.0) def outputMode(self, outputMode): @@ -1169,8 +1169,8 @@ class DataStreamWriter(object): @since(2.0) def queryName(self, queryName): - """Specifies the name of the :class:`ContinuousQuery` that can be started with - :func:`startStream`. This name must be unique among all the currently active queries + """Specifies the name of the :class:`StreamingQuery` that can be started with + :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. .. note:: Experimental. @@ -1232,21 +1232,21 @@ class DataStreamWriter(object): :param options: All other string options. You may want to provide a `checkpointLocation` for most streams, however it is not required for a `memory` stream. - >>> cq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> cq.isActive + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive True - >>> cq.name + >>> sq.name u'this_query' - >>> cq.stop() - >>> cq.isActive + >>> sq.stop() + >>> sq.isActive False - >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start( + >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( ... queryName='that_query', format='memory') - >>> cq.name + >>> sq.name u'that_query' - >>> cq.isActive + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ self.options(**options) if partitionBy is not None: @@ -1256,9 +1256,9 @@ class DataStreamWriter(object): if queryName is not None: self.queryName(queryName) if path is None: - return self._cq(self._jwrite.start()) + return self._sq(self._jwrite.start()) else: - return self._cq(self._jwrite.start(path)) + return self._sq(self._jwrite.start(path)) def _test(): http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 11c815d..6edbd59 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -565,15 +565,15 @@ class SparkSession(object): @property @since(2.0) def streams(self): - """Returns a :class:`ContinuousQueryManager` that allows managing all the - :class:`ContinuousQuery` ContinuousQueries active on `this` context. + """Returns a :class:`StreamingQueryManager` that allows managing all the + :class:`StreamingQuery` StreamingQueries active on `this` context. .. note:: Experimental. - :return: :class:`ContinuousQueryManager` + :return: :class:`StreamingQueryManager` """ - from pyspark.sql.streaming import ContinuousQueryManager - return ContinuousQueryManager(self._jsparkSession.streams()) + from pyspark.sql.streaming import StreamingQueryManager + return StreamingQueryManager(self._jsparkSession.streams()) @since(2.0) def stop(self): http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1d65094..ae45c99 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod from pyspark import since from pyspark.rdd import ignore_unicode_prefix -__all__ = ["ContinuousQuery"] +__all__ = ["StreamingQuery"] -class ContinuousQuery(object): +class StreamingQuery(object): """ A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. @@ -39,30 +39,30 @@ class ContinuousQuery(object): .. versionadded:: 2.0 """ - def __init__(self, jcq): - self._jcq = jcq + def __init__(self, jsq): + self._jsq = jsq @property @since(2.0) def id(self): - """The id of the continuous query. This id is unique across all queries that have been + """The id of the streaming query. This id is unique across all queries that have been started in the current process. """ - return self._jcq.id() + return self._jsq.id() @property @since(2.0) def name(self): - """The name of the continuous query. This name is unique across all active queries. + """The name of the streaming query. This name is unique across all active queries. """ - return self._jcq.name() + return self._jsq.name() @property @since(2.0) def isActive(self): - """Whether this continuous query is currently active or not. + """Whether this streaming query is currently active or not. """ - return self._jcq.isActive() + return self._jsq.isActive() @since(2.0) def awaitTermination(self, timeout=None): @@ -75,14 +75,14 @@ class ContinuousQuery(object): immediately (if the query was terminated by :func:`stop()`), or throw the exception immediately (if the query has terminated with exception). - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcq.awaitTermination(int(timeout * 1000)) + return self._jsq.awaitTermination(int(timeout * 1000)) else: - return self._jcq.awaitTermination() + return self._jsq.awaitTermination() @since(2.0) def processAllAvailable(self): @@ -92,26 +92,25 @@ class ContinuousQuery(object): until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). """ - return self._jcq.processAllAvailable() + return self._jsq.processAllAvailable() @since(2.0) def stop(self): - """Stop this continuous query. + """Stop this streaming query. """ - self._jcq.stop() + self._jsq.stop() -class ContinuousQueryManager(object): - """A class to manage all the :class:`ContinuousQuery` ContinuousQueries active - on a :class:`SQLContext`. +class StreamingQueryManager(object): + """A class to manage all the :class:`StreamingQuery` StreamingQueries active. .. note:: Experimental .. versionadded:: 2.0 """ - def __init__(self, jcqm): - self._jcqm = jcqm + def __init__(self, jsqm): + self._jsqm = jsqm @property @ignore_unicode_prefix @@ -119,14 +118,14 @@ class ContinuousQueryManager(object): def active(self): """Returns a list of active queries associated with this SQLContext - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cqm = spark.streams - >>> # get the list of active continuous queries - >>> [q.name for q in cqm.active] + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sqm = spark.streams + >>> # get the list of active streaming queries + >>> [q.name for q in sqm.active] [u'this_query'] - >>> cq.stop() + >>> sq.stop() """ - return [ContinuousQuery(jcq) for jcq in self._jcqm.active()] + return [StreamingQuery(jsq) for jsq in self._jsqm.active()] @ignore_unicode_prefix @since(2.0) @@ -134,20 +133,20 @@ class ContinuousQueryManager(object): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> cq = df.writeStream.format('memory').queryName('this_query').start() - >>> cq.name + >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq.name u'this_query' - >>> cq = spark.streams.get(cq.id) - >>> cq.isActive + >>> sq = spark.streams.get(sq.id) + >>> sq.isActive True - >>> cq = sqlContext.streams.get(cq.id) - >>> cq.isActive + >>> sq = sqlContext.streams.get(sq.id) + >>> sq.isActive True - >>> cq.stop() + >>> sq.stop() """ if not isinstance(id, intlike): raise ValueError("The id for the query must be an integer. Got: %s" % id) - return ContinuousQuery(self._jcqm.get(id)) + return StreamingQuery(self._jsqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout=None): @@ -168,14 +167,14 @@ class ContinuousQueryManager(object): queries, users need to stop all of them after any of them terminates with exception, and then check the `query.exception()` for each query. - throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jcqm.awaitAnyTermination(int(timeout * 1000)) + return self._jsqm.awaitAnyTermination(int(timeout * 1000)) else: - return self._jcqm.awaitAnyTermination() + return self._jsqm.awaitAnyTermination() @since(2.0) def resetTerminated(self): @@ -184,11 +183,11 @@ class ContinuousQueryManager(object): >>> spark.streams.resetTerminated() """ - self._jcqm.resetTerminated() + self._jsqm.resetTerminated() class Trigger(object): - """Used to indicate how often results should be produced by a :class:`ContinuousQuery`. + """Used to indicate how often results should be produced by a :class:`StreamingQuery`. .. note:: Experimental http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fee960a..1d5d691 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase): def test_stream_save_options(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ + q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ .format('parquet').outputMode('append').option('path', out).start() try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_save_options_overwrite(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) @@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - cq = df.writeStream.option('checkpointLocation', fake1)\ + q = df.writeStream.option('checkpointLocation', fake1)\ .format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) @@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase): self.assertFalse(os.path.isdir(fake1)) # should not have been created self.assertFalse(os.path.isdir(fake2)) # should not have been created finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: - cq.awaitTermination("hello") + q.awaitTermination("hello") self.fail("Expected a value exception") except ValueError: pass now = time.time() # test should take at least 2 seconds - res = cq.awaitTermination(2.6) + res = q.awaitTermination(2.6) duration = time.time() - now self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_query_manager_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: self.spark._wrapped.streams.awaitAnyTermination("hello") self.fail("Expected a value exception") @@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_help_command(self): http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/utils.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 9ddaf78..2a85ec0 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException): """ -class ContinuousQueryException(CapturedException): +class StreamingQueryException(CapturedException): """ - Exception that stopped a :class:`ContinuousQuery`. + Exception that stopped a :class:`StreamingQuery`. """ @@ -71,8 +71,8 @@ def capture_sql_exception(f): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '): - raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) + if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '): + raise StreamingQueryException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) if s.startswith('java.lang.IllegalArgumentException: '): http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f9db325..fba4066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython -import org.apache.spark.sql.streaming.{ContinuousQuery, DataStreamWriter} +import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -455,7 +455,7 @@ class Dataset[T] private[sql]( /** * Returns true if this Dataset contains one or more sources that continuously * return data as it arrives. A Dataset that reads data from a streaming source - * must be executed as a [[ContinuousQuery]] using the `startStream()` method in + * must be executed as a [[StreamingQuery]] using the `startStream()` method in * [[DataFrameWriter]]. Methods that return a single answer, e.g. `count()` or * `collect()`, will throw an [[AnalysisException]] when there is a streaming * source present. http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 09f0742..f56b25b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.streaming.ContinuousQuery +import org.apache.spark.sql.streaming.StreamingQuery /** * :: Experimental :: - * A class to consume data generated by a [[ContinuousQuery]]. Typically this is used to send the + * A class to consume data generated by a [[StreamingQuery]]. Typically this is used to send the * generated data to external systems. Each partition will use a new deserialized instance, so you * usually should do all the initialization (e.g. opening a connection or initiating a transaction) * in the `open` method. http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 33f6291..e7627ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.streaming.{ContinuousQueryManager, DataStreamReader} +import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -716,12 +716,12 @@ class SQLContext private[sql](val sparkSession: SparkSession) } /** - * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context. + * Returns a [[StreamingQueryManager]] that allows managing all the + * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active on `this` context. * * @since 2.0.0 */ - def streams: ContinuousQueryManager = sparkSession.streams + def streams: StreamingQueryManager = sparkSession.streams /** * Returns the names of tables in the current database as an array. http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 9137a73..251f47d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -178,13 +178,13 @@ class SparkSession private( /** * :: Experimental :: - * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[ContinuousQuery ContinuousQueries]] active on `this`. + * Returns a [[StreamingQueryManager]] that allows managing all the + * [[StreamingQuery StreamingQueries]] active on `this`. * * @since 2.0.0 */ @Experimental - def streams: ContinuousQueryManager = sessionState.continuousQueryManager + def streams: StreamingQueryManager = sessionState.streamingQueryManager /** * Start a new session with isolated SQL configurations, temporary tables, registered http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 31c9f1a..6cb1a44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -47,7 +47,7 @@ private[sql] object SQLExecution { val r = try { // sparkContext.getCallSite() would first try to pick up any call site that was previously // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on - // continuous queries would give us call site like "run at <unknown>:0" + // streaming queries would give us call site like "run at <unknown>:0" val callSite = sparkSession.sparkContext.getCallSite() sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d1261dd..60466e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.ContinuousQuery +import org.apache.spark.sql.streaming.StreamingQuery /** * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting @@ -225,7 +225,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Used to plan aggregation queries that are computed incrementally as part of a - * [[ContinuousQuery]]. Currently this rule is injected into the planner + * [[StreamingQuery]]. Currently this rule is injected into the planner * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala deleted file mode 100644 index f50951f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} -import org.apache.spark.sql.streaming.ContinuousQueryListener -import org.apache.spark.util.ListenerBus - -/** - * A bus to forward events to [[ContinuousQueryListener]]s. This one will send received - * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with - * Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them - * to ContinuousQueryListener. - */ -class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { - - import ContinuousQueryListener._ - - sparkListenerBus.addListener(this) - - /** - * Post a ContinuousQueryListener event to the Spark listener bus asynchronously. This event will - * be dispatched to all ContinuousQueryListener in the thread of the Spark listener bus. - */ - def post(event: ContinuousQueryListener.Event) { - event match { - case s: QueryStarted => - postToAll(s) - case _ => - sparkListenerBus.post(event) - } - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = { - event match { - case e: ContinuousQueryListener.Event => - postToAll(e) - case _ => - } - } - - override protected def doPostEvent( - listener: ContinuousQueryListener, - event: ContinuousQueryListener.Event): Unit = { - event match { - case queryStarted: QueryStarted => - listener.onQueryStarted(queryStarted) - case queryProgress: QueryProgress => - listener.onQueryProgress(queryProgress) - case queryTerminated: QueryTerminated => - listener.onQueryTerminated(queryTerminated) - case _ => - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5095fe7..4aefd39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -52,9 +52,9 @@ class StreamExecution( val trigger: Trigger, private[sql] val triggerClock: Clock, val outputMode: OutputMode) - extends ContinuousQuery with Logging { + extends StreamingQuery with Logging { - import org.apache.spark.sql.streaming.ContinuousQueryListener._ + import org.apache.spark.sql.streaming.StreamingQueryListener._ /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. @@ -101,7 +101,7 @@ class StreamExecution( private[sql] var lastExecution: QueryExecution = null @volatile - private[sql] var streamDeathCause: ContinuousQueryException = null + private[sql] var streamDeathCause: StreamingQueryException = null /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() @@ -140,8 +140,8 @@ class StreamExecution( override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString) - /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ - override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) + /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ + override def exception: Option[StreamingQueryException] = Option(streamDeathCause) /** Returns the path of a file with `name` in the checkpoint directory. */ private def checkpointFile(name: String): String = @@ -199,7 +199,7 @@ class StreamExecution( } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() case NonFatal(e) => - streamDeathCause = new ContinuousQueryException( + streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", e, @@ -227,7 +227,7 @@ class StreamExecution( private def populateStartOffsets(): Unit = { offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => - logInfo(s"Resuming continuous query, starting with batch $batchId") + logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) logDebug(s"Found possibly uncommitted offsets $availableOffsets") @@ -239,7 +239,7 @@ class StreamExecution( } case None => // We are starting this stream for the first time. - logInfo(s"Starting new continuous query.") + logInfo(s"Starting new streaming query.") currentBatchId = 0 constructNextBatch() } @@ -383,7 +383,7 @@ class StreamExecution( postEvent(new QueryProgress(this.toInfo)) } - private def postEvent(event: ContinuousQueryListener.Event) { + private def postEvent(event: StreamingQueryListener.Event) { sparkSession.streams.postListenerEvent(event) } @@ -468,7 +468,7 @@ class StreamExecution( } override def toString: String = { - s"Continuous Query - $name [state = $state]" + s"Streaming Query - $name [state = $state]" } def toDebugString: String = { @@ -476,7 +476,7 @@ class StreamExecution( "Error:\n" + stackTraceToString(streamDeathCause.cause) } else "" s""" - |=== Continuous Query === + |=== Streaming Query === |Name: $name |Current Offsets: $committedOffsets | @@ -490,8 +490,8 @@ class StreamExecution( """.stripMargin } - private def toInfo: ContinuousQueryInfo = { - new ContinuousQueryInfo( + private def toInfo: StreamingQueryInfo = { + new StreamingQueryInfo( this.name, this.id, this.sourceStatuses, http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala new file mode 100644 index 0000000..1e66395 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.util.ListenerBus + +/** + * A bus to forward events to [[StreamingQueryListener]]s. This one will send received + * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with + * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them + * to StreamingQueryListener. + */ +class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) + extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { + + import StreamingQueryListener._ + + sparkListenerBus.addListener(this) + + /** + * Post a StreamingQueryListener event to the Spark listener bus asynchronously. This event will + * be dispatched to all StreamingQueryListener in the thread of the Spark listener bus. + */ + def post(event: StreamingQueryListener.Event) { + event match { + case s: QueryStarted => + postToAll(s) + case _ => + sparkListenerBus.post(event) + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: StreamingQueryListener.Event => + postToAll(e) + case _ => + } + } + + override protected def doPostEvent( + listener: StreamingQueryListener, + event: StreamingQueryListener.Event): Unit = { + event match { + case queryStarted: QueryStarted => + listener.onQueryStarted(queryStarted) + case queryProgress: QueryProgress => + listener.onQueryProgress(queryProgress) + case queryTerminated: QueryTerminated => + listener.onQueryTerminated(queryTerminated) + case _ => + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 02608b0..e8bd489 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2587,7 +2587,7 @@ object functions { * 09:00:25-09:01:25 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. @@ -2641,7 +2641,7 @@ object functions { * 09:00:20-09:01:20 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. @@ -2683,7 +2683,7 @@ object functions { * 09:02:00-09:03:00 ... * }}} * - * For a continuous query, you may use the function `current_timestamp` to generate windows on + * For a streaming query, you may use the function `current_timestamp` to generate windows on * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6978b50..4b8916f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -484,14 +484,14 @@ object SQLConf { .createWithDefault(2) val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") - .doc("The default location for storing checkpoint data for continuously executing queries.") + .doc("The default location for storing checkpoint data for streaming queries.") .stringConf .createOptional val UNSUPPORTED_OPERATION_CHECK_ENABLED = SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") .internal() - .doc("When true, the logical plan for continuous query will be checked for unsupported" + + .doc("When true, the logical plan for streaming query will be checked for unsupported" + " operations.") .booleanConf .createWithDefault(true) http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b430950..59efa81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} -import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager} +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager @@ -143,10 +143,10 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager /** - * Interface to start and stop [[ContinuousQuery]]s. + * Interface to start and stop [[StreamingQuery]]s. */ - lazy val continuousQueryManager: ContinuousQueryManager = { - new ContinuousQueryManager(sparkSession) + lazy val streamingQueryManager: StreamingQueryManager = { + new StreamingQueryManager(sparkSession) } private val jarClassLoader: NonClosableMutableURLClassLoader = http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala deleted file mode 100644 index 1e0a47d..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.SparkSession - -/** - * :: Experimental :: - * A handle to a query that is executing continuously in the background as new data arrives. - * All these methods are thread-safe. - * @since 2.0.0 - */ -@Experimental -trait ContinuousQuery { - - /** - * Returns the name of the query. This name is unique across all active queries. This can be - * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as - * `dataframe.write().queryName("query").startStream()`. - * @since 2.0.0 - */ - def name: String - - /** - * Returns the unique id of this query. This id is automatically generated and is unique across - * all queries that have been started in the current process. - * @since 2.0.0 - */ - def id: Long - - /** - * Returns the [[SparkSession]] associated with `this`. - * @since 2.0.0 - */ - def sparkSession: SparkSession - - /** - * Whether the query is currently active or not - * @since 2.0.0 - */ - def isActive: Boolean - - /** - * Returns the [[ContinuousQueryException]] if the query was terminated by an exception. - * @since 2.0.0 - */ - def exception: Option[ContinuousQueryException] - - /** - * Returns current status of all the sources. - * @since 2.0.0 - */ - def sourceStatuses: Array[SourceStatus] - - /** Returns current status of the sink. */ - def sinkStatus: SinkStatus - - /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. - * - * If the query has terminated, then all subsequent calls to this method will either return - * immediately (if the query was terminated by `stop()`), or throw the exception - * immediately (if the query has terminated with exception). - * - * @throws ContinuousQueryException, if `this` query has terminated with an exception. - * - * @since 2.0.0 - */ - def awaitTermination(): Unit - - /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. - * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` - * milliseconds. - * - * If the query has terminated, then all subsequent calls to this method will either return - * `true` immediately (if the query was terminated by `stop()`), or throw the exception - * immediately (if the query has terminated with exception). - * - * @throws ContinuousQueryException, if `this` query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitTermination(timeoutMs: Long): Boolean - - /** - * Blocks until all available data in the source has been processed and committed to the sink. - * This method is intended for testing. Note that in the case of continually arriving data, this - * method may block forever. Additionally, this method is only guaranteed to block until data that - * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] - * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). - */ - def processAllAvailable(): Unit - - /** - * Stops the execution of this query if it is running. This method blocks until the threads - * performing execution has stopped. - * @since 2.0.0 - */ - def stop(): Unit -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala deleted file mode 100644 index 5196c5a..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} - -/** - * :: Experimental :: - * Exception that stopped a [[ContinuousQuery]]. - * @param query Query that caused the exception - * @param message Message of this exception - * @param cause Internal cause of this exception - * @param startOffset Starting offset (if known) of the range of data in which exception occurred - * @param endOffset Ending offset (if known) of the range of data in exception occurred - * @since 2.0.0 - */ -@Experimental -class ContinuousQueryException private[sql]( - @transient val query: ContinuousQuery, - val message: String, - val cause: Throwable, - val startOffset: Option[Offset] = None, - val endOffset: Option[Offset] = None) - extends Exception(message, cause) { - - /** Time when the exception occurred */ - val time: Long = System.currentTimeMillis - - override def toString(): String = { - val causeStr = - s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" - s""" - |$causeStr - | - |${query.asInstanceOf[StreamExecution].toDebugString} - """.stripMargin - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala deleted file mode 100644 index 19f2270..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.annotation.Experimental - -/** - * :: Experimental :: - * A class used to report information about the progress of a [[ContinuousQuery]]. - * - * @param name The [[ContinuousQuery]] name. This name is unique across all active queries. - * @param id The [[ContinuousQuery]] id. This id is unique across - * all queries that have been started in the current process. - * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources. - * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink. - */ -@Experimental -class ContinuousQueryInfo private[sql]( - val name: String, - val id: Long, - val sourceStatuses: Seq[SourceStatus], - val sinkStatus: SinkStatus) http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala deleted file mode 100644 index dd31114..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.annotation.Experimental -import org.apache.spark.scheduler.SparkListenerEvent - -/** - * :: Experimental :: - * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]]. - * @note The methods are not thread-safe as they may be called from different threads. - * - * @since 2.0.0 - */ -@Experimental -abstract class ContinuousQueryListener { - - import ContinuousQueryListener._ - - /** - * Called when a query is started. - * @note This is called synchronously with - * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], - * that is, `onQueryStart` will be called on all listeners before - * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please - * don't block this method as it will block your query. - * @since 2.0.0 - */ - def onQueryStarted(queryStarted: QueryStarted): Unit - - /** - * Called when there is some status update (ingestion rate updated, etc.) - * - * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be - * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]] - * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]] - * is terminated when you are processing [[QueryProgress]]. - * @since 2.0.0 - */ - def onQueryProgress(queryProgress: QueryProgress): Unit - - /** - * Called when a query is stopped, with or without error. - * @since 2.0.0 - */ - def onQueryTerminated(queryTerminated: QueryTerminated): Unit -} - - -/** - * :: Experimental :: - * Companion object of [[ContinuousQueryListener]] that defines the listener events. - * @since 2.0.0 - */ -@Experimental -object ContinuousQueryListener { - - /** - * :: Experimental :: - * Base type of [[ContinuousQueryListener]] events - * @since 2.0.0 - */ - @Experimental - trait Event extends SparkListenerEvent - - /** - * :: Experimental :: - * Event representing the start of a query - * @since 2.0.0 - */ - @Experimental - class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event - - /** - * :: Experimental :: - * Event representing any progress updates in a query - * @since 2.0.0 - */ - @Experimental - class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event - - /** - * :: Experimental :: - * Event representing that termination of a query - * - * @param queryInfo Information about the status of the query. - * @param exception The exception message of the [[ContinuousQuery]] if the query was terminated - * with an exception. Otherwise, it will be `None`. - * @param stackTrace The stack trace of the exception if the query was terminated with an - * exception. It will be empty if there was no error. - * @since 2.0.0 - */ - @Experimental - class QueryTerminated private[sql]( - val queryInfo: ContinuousQueryInfo, - val exception: Option[String], - val stackTrace: Seq[StackTraceElement]) extends Event -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala deleted file mode 100644 index 0f4a9c9..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import scala.collection.mutable - -import org.apache.hadoop.fs.Path - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{Clock, SystemClock, Utils} - -/** - * :: Experimental :: - * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]]. - * - * @since 2.0.0 - */ -@Experimental -class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { - - private[sql] val stateStoreCoordinator = - StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) - private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus) - private val activeQueries = new mutable.HashMap[Long, ContinuousQuery] - private val activeQueriesLock = new Object - private val awaitTerminationLock = new Object - - private var lastTerminatedQuery: ContinuousQuery = null - - /** - * Returns a list of active queries associated with this SQLContext - * - * @since 2.0.0 - */ - def active: Array[ContinuousQuery] = activeQueriesLock.synchronized { - activeQueries.values.toArray - } - - /** - * Returns the query if there is an active query with the given id, or null. - * - * @since 2.0.0 - */ - def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized { - activeQueries.get(id).orNull - } - - /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. If any query was terminated - * with an exception, then the exception will be thrown. - * - * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. - * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the - * `query.exception()` for each query. - * - * @throws ContinuousQueryException, if any query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitAnyTermination(): Unit = { - awaitTerminationLock.synchronized { - while (lastTerminatedQuery == null) { - awaitTerminationLock.wait(10) - } - if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { - throw lastTerminatedQuery.exception.get - } - } - } - - /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. Returns whether any query - * has terminated or not (multiple may have terminated). If any query has terminated with an - * exception, then the exception will be thrown. - * - * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return `true` immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. - * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the - * `query.exception()` for each query. - * - * @throws ContinuousQueryException, if any query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitAnyTermination(timeoutMs: Long): Boolean = { - - val startTime = System.currentTimeMillis - def isTimedout = System.currentTimeMillis - startTime >= timeoutMs - - awaitTerminationLock.synchronized { - while (!isTimedout && lastTerminatedQuery == null) { - awaitTerminationLock.wait(10) - } - if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { - throw lastTerminatedQuery.exception.get - } - lastTerminatedQuery != null - } - } - - /** - * Forget about past terminated queries so that `awaitAnyTermination()` can be used again to - * wait for new terminations. - * - * @since 2.0.0 - */ - def resetTerminated(): Unit = { - awaitTerminationLock.synchronized { - lastTerminatedQuery = null - } - } - - /** - * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of - * [[ContinuousQuery]]. - * - * @since 2.0.0 - */ - def addListener(listener: ContinuousQueryListener): Unit = { - listenerBus.addListener(listener) - } - - /** - * Deregister a [[ContinuousQueryListener]]. - * - * @since 2.0.0 - */ - def removeListener(listener: ContinuousQueryListener): Unit = { - listenerBus.removeListener(listener) - } - - /** Post a listener event */ - private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = { - listenerBus.post(event) - } - - /** - * Start a [[ContinuousQuery]]. - * @param userSpecifiedName Query name optionally specified by the user. - * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. - * @param df Streaming DataFrame. - * @param sink Sink to write the streaming outputs. - * @param outputMode Output mode for the sink. - * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user - * has not specified one. If false, then error will be thrown. - * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location. - * If false and the checkpoint location exists, then error - * will be thrown. - * @param trigger [[Trigger]] for the query. - * @param triggerClock [[Clock]] to use for the triggering. - */ - private[sql] def startQuery( - userSpecifiedName: Option[String], - userSpecifiedCheckpointLocation: Option[String], - df: DataFrame, - sink: Sink, - outputMode: OutputMode, - useTempCheckpointLocation: Boolean = false, - recoverFromCheckpointLocation: Boolean = true, - trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock()): ContinuousQuery = { - activeQueriesLock.synchronized { - val id = StreamExecution.nextId - val name = userSpecifiedName.getOrElse(s"query-$id") - if (activeQueries.values.exists(_.name == name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") - } - val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => - new Path(userSpecified).toUri.toString - }.orElse { - df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location => - new Path(location, name).toUri.toString - } - }.getOrElse { - if (useTempCheckpointLocation) { - Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath - } else { - throw new AnalysisException( - "checkpointLocation must be specified either " + - """through option("checkpointLocation", ...) or """ + - s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") - } - } - - // If offsets have already been created, we trying to resume a query. - if (!recoverFromCheckpointLocation) { - val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) - if (fs.exists(checkpointPath)) { - throw new AnalysisException( - s"This query does not support recovering from checkpoint location. " + - s"Delete $checkpointPath to start over.") - } - } - - val analyzedPlan = df.queryExecution.analyzed - df.queryExecution.assertAnalyzed() - - if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { - UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) - } - - var nextSourceId = 0L - - val logicalPlan = analyzedPlan.transform { - case StreamingRelation(dataSource, _, output) => - // Materialize source to avoid creating it in every batch - val metadataPath = s"$checkpointLocation/sources/$nextSourceId" - val source = dataSource.createSource(metadataPath) - nextSourceId += 1 - // We still need to use the previous `output` instead of `source.schema` as attributes in - // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output) - } - val query = new StreamExecution( - sparkSession, - id, - name, - checkpointLocation, - logicalPlan, - sink, - trigger, - triggerClock, - outputMode) - query.start() - activeQueries.put(id, query) - query - } - } - - /** Notify (by the ContinuousQuery) that the query has been terminated */ - private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = { - activeQueriesLock.synchronized { - activeQueries -= terminatedQuery.id - } - awaitTerminationLock.synchronized { - if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { - lastTerminatedQuery = terminatedQuery - } - awaitTerminationLock.notifyAll() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b035ff7..1977074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -109,7 +109,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: - * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * Specifies the name of the [[StreamingQuery]] that can be started with `startStream()`. * This name must be unique among all the currently active queries in the associated SQLContext. * * @since 2.0.0 @@ -221,26 +221,26 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ @Experimental - def start(path: String): ContinuousQuery = { + def start(path: String): StreamingQuery = { option("path", path).start() } /** * :: Experimental :: * Starts the execution of the streaming query, which will continually output results to the given - * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with * the stream. * * @since 2.0.0 */ @Experimental - def start(): ContinuousQuery = { + def start(): StreamingQuery = { if (source == "memory") { assertNotPartitioned("memory") if (extraOptions.get("queryName").isEmpty) { @@ -249,7 +249,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) - val query = df.sparkSession.sessionState.continuousQueryManager.startQuery( + val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, @@ -263,7 +263,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else if (source == "foreach") { assertNotPartitioned("foreach") val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) - df.sparkSession.sessionState.continuousQueryManager.startQuery( + df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, @@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) - df.sparkSession.sessionState.continuousQueryManager.startQuery( + df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala new file mode 100644 index 0000000..dc81a5b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.SparkSession + +/** + * :: Experimental :: + * A handle to a query that is executing continuously in the background as new data arrives. + * All these methods are thread-safe. + * @since 2.0.0 + */ +@Experimental +trait StreamingQuery { + + /** + * Returns the name of the query. This name is unique across all active queries. This can be + * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as + * `dataframe.write().queryName("query").startStream()`. + * @since 2.0.0 + */ + def name: String + + /** + * Returns the unique id of this query. This id is automatically generated and is unique across + * all queries that have been started in the current process. + * @since 2.0.0 + */ + def id: Long + + /** + * Returns the [[SparkSession]] associated with `this`. + * @since 2.0.0 + */ + def sparkSession: SparkSession + + /** + * Whether the query is currently active or not + * @since 2.0.0 + */ + def isActive: Boolean + + /** + * Returns the [[StreamingQueryException]] if the query was terminated by an exception. + * @since 2.0.0 + */ + def exception: Option[StreamingQueryException] + + /** + * Returns current status of all the sources. + * @since 2.0.0 + */ + def sourceStatuses: Array[SourceStatus] + + /** Returns current status of the sink. */ + def sinkStatus: SinkStatus + + /** + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. + * + * If the query has terminated, then all subsequent calls to this method will either return + * immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). + * + * @throws StreamingQueryException, if `this` query has terminated with an exception. + * + * @since 2.0.0 + */ + def awaitTermination(): Unit + + /** + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. + * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` + * milliseconds. + * + * If the query has terminated, then all subsequent calls to this method will either return + * `true` immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). + * + * @throws StreamingQueryException, if `this` query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitTermination(timeoutMs: Long): Boolean + + /** + * Blocks until all available data in the source has been processed and committed to the sink. + * This method is intended for testing. Note that in the case of continually arriving data, this + * method may block forever. Additionally, this method is only guaranteed to block until data that + * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] + * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + */ + def processAllAvailable(): Unit + + /** + * Stops the execution of this query if it is running. This method blocks until the threads + * performing execution has stopped. + * @since 2.0.0 + */ + def stop(): Unit +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala new file mode 100644 index 0000000..90f95ca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} + +/** + * :: Experimental :: + * Exception that stopped a [[StreamingQuery]]. + * @param query Query that caused the exception + * @param message Message of this exception + * @param cause Internal cause of this exception + * @param startOffset Starting offset (if known) of the range of data in which exception occurred + * @param endOffset Ending offset (if known) of the range of data in exception occurred + * @since 2.0.0 + */ +@Experimental +class StreamingQueryException private[sql]( + @transient val query: StreamingQuery, + val message: String, + val cause: Throwable, + val startOffset: Option[Offset] = None, + val endOffset: Option[Offset] = None) + extends Exception(message, cause) { + + /** Time when the exception occurred */ + val time: Long = System.currentTimeMillis + + override def toString(): String = { + val causeStr = + s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" + s""" + |$causeStr + | + |${query.asInstanceOf[StreamExecution].toDebugString} + """.stripMargin + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala new file mode 100644 index 0000000..1af2668 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * A class used to report information about the progress of a [[StreamingQuery]]. + * + * @param name The [[StreamingQuery]] name. This name is unique across all active queries. + * @param id The [[StreamingQuery]] id. This id is unique across + * all queries that have been started in the current process. + * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s sources. + * @param sinkStatus The current status of the [[StreamingQuery]]'s sink. + */ +@Experimental +class StreamingQueryInfo private[sql]( + val name: String, + val id: Long, + val sourceStatuses: Seq[SourceStatus], + val sinkStatus: SinkStatus) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org