[SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery

Renamed for simplicity, so that its obvious that its related to streaming.

Existing unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13673 from tdas/SPARK-15953.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a507199
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a507199
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a507199

Branch: refs/heads/master
Commit: 9a5071996b968148f6b9aba12e0d3fe888d9acd8
Parents: d30b7e6
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Jun 15 10:46:02 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Jun 15 10:46:07 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/context.py                   |   8 +-
 python/pyspark/sql/dataframe.py                 |   2 +-
 python/pyspark/sql/readwriter.py                |  40 +--
 python/pyspark/sql/session.py                   |  10 +-
 python/pyspark/sql/streaming.py                 |  79 +++--
 python/pyspark/sql/tests.py                     |  52 ++--
 python/pyspark/sql/utils.py                     |   8 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |   4 +-
 .../org/apache/spark/sql/ForeachWriter.scala    |   4 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   8 +-
 .../org/apache/spark/sql/SparkSession.scala     |   6 +-
 .../spark/sql/execution/SQLExecution.scala      |   2 +-
 .../spark/sql/execution/SparkStrategies.scala   |   4 +-
 .../streaming/ContinuousQueryListenerBus.scala  |  72 -----
 .../execution/streaming/StreamExecution.scala   |  26 +-
 .../streaming/StreamingQueryListenerBus.scala   |  72 +++++
 .../scala/org/apache/spark/sql/functions.scala  |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   4 +-
 .../spark/sql/internal/SessionState.scala       |   8 +-
 .../spark/sql/streaming/ContinuousQuery.scala   | 119 --------
 .../streaming/ContinuousQueryException.scala    |  54 ----
 .../sql/streaming/ContinuousQueryInfo.scala     |  37 ---
 .../sql/streaming/ContinuousQueryListener.scala | 113 -------
 .../sql/streaming/ContinuousQueryManager.scala  | 279 -----------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  16 +-
 .../spark/sql/streaming/StreamingQuery.scala    | 119 ++++++++
 .../sql/streaming/StreamingQueryException.scala |  54 ++++
 .../sql/streaming/StreamingQueryInfo.scala      |  37 +++
 .../sql/streaming/StreamingQueryListener.scala  | 113 +++++++
 .../sql/streaming/StreamingQueryManager.scala   | 279 +++++++++++++++++
 .../apache/spark/sql/streaming/Trigger.scala    |   4 +-
 .../ContinuousQueryListenerSuite.scala          | 304 -------------------
 .../streaming/ContinuousQueryManagerSuite.scala | 299 ------------------
 .../sql/streaming/ContinuousQuerySuite.scala    | 180 -----------
 .../sql/streaming/FileStreamSinkSuite.scala     |   6 +-
 .../spark/sql/streaming/FileStressSuite.scala   |   6 +-
 .../apache/spark/sql/streaming/StreamTest.scala |   4 +-
 .../streaming/StreamingAggregationSuite.scala   |   2 -
 .../streaming/StreamingQueryListenerSuite.scala | 304 +++++++++++++++++++
 .../streaming/StreamingQueryManagerSuite.scala  | 299 ++++++++++++++++++
 .../sql/streaming/StreamingQuerySuite.scala     | 180 +++++++++++
 .../test/DataStreamReaderWriterSuite.scala      |  10 +-
 42 files changed, 1615 insertions(+), 1618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index a271afe..8a1a874 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -444,13 +444,13 @@ class SQLContext(object):
     @property
     @since(2.0)
     def streams(self):
-        """Returns a :class:`ContinuousQueryManager` that allows managing all 
the
-        :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+        """Returns a :class:`StreamingQueryManager` that allows managing all 
the
+        :class:`StreamingQuery` StreamingQueries active on `this` context.
 
         .. note:: Experimental.
         """
-        from pyspark.sql.streaming import ContinuousQueryManager
-        return ContinuousQueryManager(self._ssql_ctx.streams())
+        from pyspark.sql.streaming import StreamingQueryManager
+        return StreamingQueryManager(self._ssql_ctx.streams())
 
 
 class HiveContext(SQLContext):

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0126faf..acf9d08 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -257,7 +257,7 @@ class DataFrame(object):
     def isStreaming(self):
         """Returns true if this :class:`Dataset` contains one or more sources 
that continuously
         return data as it arrives. A :class:`Dataset` that reads data from a 
streaming source
-        must be executed as a :class:`ContinuousQuery` using the 
:func:`startStream` method in
+        must be executed as a :class:`StreamingQuery` using the 
:func:`startStream` method in
         :class:`DataFrameWriter`.  Methods that return a single answer, (e.g., 
:func:`count` or
         :func:`collect`) will throw an :class:`AnalysisException` when there 
is a streaming
         source present.

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index ad954d0..c982de6 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
 from pyspark.sql.types import *
 from pyspark.sql import utils
 
-__all__ = ["DataFrameReader", "DataFrameWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", 
"DataStreamWriter"]
 
 
 def to_str(value):
@@ -458,9 +458,9 @@ class DataFrameWriter(object):
         self._spark = df.sql_ctx
         self._jwrite = df._jdf.write()
 
-    def _cq(self, jcq):
-        from pyspark.sql.streaming import ContinuousQuery
-        return ContinuousQuery(jcq)
+    def _sq(self, jsq):
+        from pyspark.sql.streaming import StreamingQuery
+        return StreamingQuery(jsq)
 
     @since(1.4)
     def mode(self, saveMode):
@@ -1094,9 +1094,9 @@ class DataStreamWriter(object):
         self._spark = df.sql_ctx
         self._jwrite = df._jdf.writeStream()
 
-    def _cq(self, jcq):
-        from pyspark.sql.streaming import ContinuousQuery
-        return ContinuousQuery(jcq)
+    def _sq(self, jsq):
+        from pyspark.sql.streaming import StreamingQuery
+        return StreamingQuery(jsq)
 
     @since(2.0)
     def outputMode(self, outputMode):
@@ -1169,8 +1169,8 @@ class DataStreamWriter(object):
 
     @since(2.0)
     def queryName(self, queryName):
-        """Specifies the name of the :class:`ContinuousQuery` that can be 
started with
-        :func:`startStream`. This name must be unique among all the currently 
active queries
+        """Specifies the name of the :class:`StreamingQuery` that can be 
started with
+        :func:`start`. This name must be unique among all the currently active 
queries
         in the associated SparkSession.
 
         .. note:: Experimental.
@@ -1232,21 +1232,21 @@ class DataStreamWriter(object):
         :param options: All other string options. You may want to provide a 
`checkpointLocation`
             for most streams, however it is not required for a `memory` stream.
 
-        >>> cq = 
sdf.writeStream.format('memory').queryName('this_query').start()
-        >>> cq.isActive
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+        >>> sq.isActive
         True
-        >>> cq.name
+        >>> sq.name
         u'this_query'
-        >>> cq.stop()
-        >>> cq.isActive
+        >>> sq.stop()
+        >>> sq.isActive
         False
-        >>> cq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+        >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
         ...     queryName='that_query', format='memory')
-        >>> cq.name
+        >>> sq.name
         u'that_query'
-        >>> cq.isActive
+        >>> sq.isActive
         True
-        >>> cq.stop()
+        >>> sq.stop()
         """
         self.options(**options)
         if partitionBy is not None:
@@ -1256,9 +1256,9 @@ class DataStreamWriter(object):
         if queryName is not None:
             self.queryName(queryName)
         if path is None:
-            return self._cq(self._jwrite.start())
+            return self._sq(self._jwrite.start())
         else:
-            return self._cq(self._jwrite.start(path))
+            return self._sq(self._jwrite.start(path))
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 11c815d..6edbd59 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -565,15 +565,15 @@ class SparkSession(object):
     @property
     @since(2.0)
     def streams(self):
-        """Returns a :class:`ContinuousQueryManager` that allows managing all 
the
-        :class:`ContinuousQuery` ContinuousQueries active on `this` context.
+        """Returns a :class:`StreamingQueryManager` that allows managing all 
the
+        :class:`StreamingQuery` StreamingQueries active on `this` context.
 
         .. note:: Experimental.
 
-        :return: :class:`ContinuousQueryManager`
+        :return: :class:`StreamingQueryManager`
         """
-        from pyspark.sql.streaming import ContinuousQueryManager
-        return ContinuousQueryManager(self._jsparkSession.streams())
+        from pyspark.sql.streaming import StreamingQueryManager
+        return StreamingQueryManager(self._jsparkSession.streams())
 
     @since(2.0)
     def stop(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1d65094..ae45c99 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -26,10 +26,10 @@ from abc import ABCMeta, abstractmethod
 from pyspark import since
 from pyspark.rdd import ignore_unicode_prefix
 
-__all__ = ["ContinuousQuery"]
+__all__ = ["StreamingQuery"]
 
 
-class ContinuousQuery(object):
+class StreamingQuery(object):
     """
     A handle to a query that is executing continuously in the background as 
new data arrives.
     All these methods are thread-safe.
@@ -39,30 +39,30 @@ class ContinuousQuery(object):
     .. versionadded:: 2.0
     """
 
-    def __init__(self, jcq):
-        self._jcq = jcq
+    def __init__(self, jsq):
+        self._jsq = jsq
 
     @property
     @since(2.0)
     def id(self):
-        """The id of the continuous query. This id is unique across all 
queries that have been
+        """The id of the streaming query. This id is unique across all queries 
that have been
         started in the current process.
         """
-        return self._jcq.id()
+        return self._jsq.id()
 
     @property
     @since(2.0)
     def name(self):
-        """The name of the continuous query. This name is unique across all 
active queries.
+        """The name of the streaming query. This name is unique across all 
active queries.
         """
-        return self._jcq.name()
+        return self._jsq.name()
 
     @property
     @since(2.0)
     def isActive(self):
-        """Whether this continuous query is currently active or not.
+        """Whether this streaming query is currently active or not.
         """
-        return self._jcq.isActive()
+        return self._jsq.isActive()
 
     @since(2.0)
     def awaitTermination(self, timeout=None):
@@ -75,14 +75,14 @@ class ContinuousQuery(object):
         immediately (if the query was terminated by :func:`stop()`), or throw 
the exception
         immediately (if the query has terminated with exception).
 
-        throws :class:`ContinuousQueryException`, if `this` query has 
terminated with an exception
+        throws :class:`StreamingQueryException`, if `this` query has 
terminated with an exception
         """
         if timeout is not None:
             if not isinstance(timeout, (int, float)) or timeout < 0:
                 raise ValueError("timeout must be a positive integer or float. 
Got %s" % timeout)
-            return self._jcq.awaitTermination(int(timeout * 1000))
+            return self._jsq.awaitTermination(int(timeout * 1000))
         else:
-            return self._jcq.awaitTermination()
+            return self._jsq.awaitTermination()
 
     @since(2.0)
     def processAllAvailable(self):
@@ -92,26 +92,25 @@ class ContinuousQuery(object):
         until data that has been synchronously appended data to a stream 
source prior to invocation.
         (i.e. `getOffset` must immediately reflect the addition).
         """
-        return self._jcq.processAllAvailable()
+        return self._jsq.processAllAvailable()
 
     @since(2.0)
     def stop(self):
-        """Stop this continuous query.
+        """Stop this streaming query.
         """
-        self._jcq.stop()
+        self._jsq.stop()
 
 
-class ContinuousQueryManager(object):
-    """A class to manage all the :class:`ContinuousQuery` ContinuousQueries 
active
-    on a :class:`SQLContext`.
+class StreamingQueryManager(object):
+    """A class to manage all the :class:`StreamingQuery` StreamingQueries 
active.
 
     .. note:: Experimental
 
     .. versionadded:: 2.0
     """
 
-    def __init__(self, jcqm):
-        self._jcqm = jcqm
+    def __init__(self, jsqm):
+        self._jsqm = jsqm
 
     @property
     @ignore_unicode_prefix
@@ -119,14 +118,14 @@ class ContinuousQueryManager(object):
     def active(self):
         """Returns a list of active queries associated with this SQLContext
 
-        >>> cq = 
df.writeStream.format('memory').queryName('this_query').start()
-        >>> cqm = spark.streams
-        >>> # get the list of active continuous queries
-        >>> [q.name for q in cqm.active]
+        >>> sq = 
df.writeStream.format('memory').queryName('this_query').start()
+        >>> sqm = spark.streams
+        >>> # get the list of active streaming queries
+        >>> [q.name for q in sqm.active]
         [u'this_query']
-        >>> cq.stop()
+        >>> sq.stop()
         """
-        return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
+        return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
 
     @ignore_unicode_prefix
     @since(2.0)
@@ -134,20 +133,20 @@ class ContinuousQueryManager(object):
         """Returns an active query from this SQLContext or throws exception if 
an active query
         with this name doesn't exist.
 
-        >>> cq = 
df.writeStream.format('memory').queryName('this_query').start()
-        >>> cq.name
+        >>> sq = 
df.writeStream.format('memory').queryName('this_query').start()
+        >>> sq.name
         u'this_query'
-        >>> cq = spark.streams.get(cq.id)
-        >>> cq.isActive
+        >>> sq = spark.streams.get(sq.id)
+        >>> sq.isActive
         True
-        >>> cq = sqlContext.streams.get(cq.id)
-        >>> cq.isActive
+        >>> sq = sqlContext.streams.get(sq.id)
+        >>> sq.isActive
         True
-        >>> cq.stop()
+        >>> sq.stop()
         """
         if not isinstance(id, intlike):
             raise ValueError("The id for the query must be an integer. Got: 
%s" % id)
-        return ContinuousQuery(self._jcqm.get(id))
+        return StreamingQuery(self._jsqm.get(id))
 
     @since(2.0)
     def awaitAnyTermination(self, timeout=None):
@@ -168,14 +167,14 @@ class ContinuousQueryManager(object):
         queries, users need to stop all of them after any of them terminates 
with exception, and
         then check the `query.exception()` for each query.
 
-        throws :class:`ContinuousQueryException`, if `this` query has 
terminated with an exception
+        throws :class:`StreamingQueryException`, if `this` query has 
terminated with an exception
         """
         if timeout is not None:
             if not isinstance(timeout, (int, float)) or timeout < 0:
                 raise ValueError("timeout must be a positive integer or float. 
Got %s" % timeout)
-            return self._jcqm.awaitAnyTermination(int(timeout * 1000))
+            return self._jsqm.awaitAnyTermination(int(timeout * 1000))
         else:
-            return self._jcqm.awaitAnyTermination()
+            return self._jsqm.awaitAnyTermination()
 
     @since(2.0)
     def resetTerminated(self):
@@ -184,11 +183,11 @@ class ContinuousQueryManager(object):
 
         >>> spark.streams.resetTerminated()
         """
-        self._jcqm.resetTerminated()
+        self._jsqm.resetTerminated()
 
 
 class Trigger(object):
-    """Used to indicate how often results should be produced by a 
:class:`ContinuousQuery`.
+    """Used to indicate how often results should be produced by a 
:class:`StreamingQuery`.
 
     .. note:: Experimental
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fee960a..1d5d691 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_stream_save_options(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
-        for cq in self.spark._wrapped.streams.active:
-            cq.stop()
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
         tmpPath = tempfile.mkdtemp()
         shutil.rmtree(tmpPath)
         self.assertTrue(df.isStreaming)
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
-        cq = df.writeStream.option('checkpointLocation', 
chk).queryName('this_query') \
+        q = df.writeStream.option('checkpointLocation', 
chk).queryName('this_query') \
             .format('parquet').outputMode('append').option('path', out).start()
         try:
-            self.assertEqual(cq.name, 'this_query')
-            self.assertTrue(cq.isActive)
-            cq.processAllAvailable()
+            self.assertEqual(q.name, 'this_query')
+            self.assertTrue(q.isActive)
+            q.processAllAvailable()
             output_files = []
             for _, _, files in os.walk(out):
                 output_files.extend([f for f in files if not 
f.startswith('.')])
             self.assertTrue(len(output_files) > 0)
             self.assertTrue(len(os.listdir(chk)) > 0)
         finally:
-            cq.stop()
+            q.stop()
             shutil.rmtree(tmpPath)
 
     def test_stream_save_options_overwrite(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
-        for cq in self.spark._wrapped.streams.active:
-            cq.stop()
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
         tmpPath = tempfile.mkdtemp()
         shutil.rmtree(tmpPath)
         self.assertTrue(df.isStreaming)
@@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase):
         chk = os.path.join(tmpPath, 'chk')
         fake1 = os.path.join(tmpPath, 'fake1')
         fake2 = os.path.join(tmpPath, 'fake2')
-        cq = df.writeStream.option('checkpointLocation', fake1)\
+        q = df.writeStream.option('checkpointLocation', fake1)\
             .format('memory').option('path', fake2) \
             .queryName('fake_query').outputMode('append') \
             .start(path=out, format='parquet', queryName='this_query', 
checkpointLocation=chk)
 
         try:
-            self.assertEqual(cq.name, 'this_query')
-            self.assertTrue(cq.isActive)
-            cq.processAllAvailable()
+            self.assertEqual(q.name, 'this_query')
+            self.assertTrue(q.isActive)
+            q.processAllAvailable()
             output_files = []
             for _, _, files in os.walk(out):
                 output_files.extend([f for f in files if not 
f.startswith('.')])
@@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase):
             self.assertFalse(os.path.isdir(fake1))  # should not have been 
created
             self.assertFalse(os.path.isdir(fake2))  # should not have been 
created
         finally:
-            cq.stop()
+            q.stop()
             shutil.rmtree(tmpPath)
 
     def test_stream_await_termination(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
-        for cq in self.spark._wrapped.streams.active:
-            cq.stop()
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
         tmpPath = tempfile.mkdtemp()
         shutil.rmtree(tmpPath)
         self.assertTrue(df.isStreaming)
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
-        cq = df.writeStream\
+        q = df.writeStream\
             .start(path=out, format='parquet', queryName='this_query', 
checkpointLocation=chk)
         try:
-            self.assertTrue(cq.isActive)
+            self.assertTrue(q.isActive)
             try:
-                cq.awaitTermination("hello")
+                q.awaitTermination("hello")
                 self.fail("Expected a value exception")
             except ValueError:
                 pass
             now = time.time()
             # test should take at least 2 seconds
-            res = cq.awaitTermination(2.6)
+            res = q.awaitTermination(2.6)
             duration = time.time() - now
             self.assertTrue(duration >= 2)
             self.assertFalse(res)
         finally:
-            cq.stop()
+            q.stop()
             shutil.rmtree(tmpPath)
 
     def test_query_manager_await_termination(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
-        for cq in self.spark._wrapped.streams.active:
-            cq.stop()
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
         tmpPath = tempfile.mkdtemp()
         shutil.rmtree(tmpPath)
         self.assertTrue(df.isStreaming)
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
-        cq = df.writeStream\
+        q = df.writeStream\
             .start(path=out, format='parquet', queryName='this_query', 
checkpointLocation=chk)
         try:
-            self.assertTrue(cq.isActive)
+            self.assertTrue(q.isActive)
             try:
                 self.spark._wrapped.streams.awaitAnyTermination("hello")
                 self.fail("Expected a value exception")
@@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase):
             self.assertTrue(duration >= 2)
             self.assertFalse(res)
         finally:
-            cq.stop()
+            q.stop()
             shutil.rmtree(tmpPath)
 
     def test_help_command(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/python/pyspark/sql/utils.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 9ddaf78..2a85ec0 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -45,9 +45,9 @@ class IllegalArgumentException(CapturedException):
     """
 
 
-class ContinuousQueryException(CapturedException):
+class StreamingQueryException(CapturedException):
     """
-    Exception that stopped a :class:`ContinuousQuery`.
+    Exception that stopped a :class:`StreamingQuery`.
     """
 
 
@@ -71,8 +71,8 @@ def capture_sql_exception(f):
                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
             if 
s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
                 raise ParseException(s.split(': ', 1)[1], stackTrace)
-            if 
s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
-                raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
+            if 
s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
+                raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
             if 
s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
             if s.startswith('java.lang.IllegalArgumentException: '):

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f9db325..fba4066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -49,7 +49,7 @@ import 
org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
-import org.apache.spark.sql.streaming.{ContinuousQuery, DataStreamWriter}
+import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -455,7 +455,7 @@ class Dataset[T] private[sql](
   /**
    * Returns true if this Dataset contains one or more sources that 
continuously
    * return data as it arrives. A Dataset that reads data from a streaming 
source
-   * must be executed as a [[ContinuousQuery]] using the `startStream()` 
method in
+   * must be executed as a [[StreamingQuery]] using the `startStream()` method 
in
    * [[DataFrameWriter]]. Methods that return a single answer, e.g. `count()` 
or
    * `collect()`, will throw an [[AnalysisException]] when there is a streaming
    * source present.

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index 09f0742..f56b25b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -18,11 +18,11 @@
 package org.apache.spark.sql
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.ContinuousQuery
+import org.apache.spark.sql.streaming.StreamingQuery
 
 /**
  * :: Experimental ::
- * A class to consume data generated by a [[ContinuousQuery]]. Typically this 
is used to send the
+ * A class to consume data generated by a [[StreamingQuery]]. Typically this 
is used to send the
  * generated data to external systems. Each partition will use a new 
deserialized instance, so you
  * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
  * in the `open` method.

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 33f6291..e7627ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.streaming.{ContinuousQueryManager, 
DataStreamReader}
+import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -716,12 +716,12 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   }
 
   /**
-   * Returns a [[ContinuousQueryManager]] that allows managing all the
-   * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] 
active on `this` context.
+   * Returns a [[StreamingQueryManager]] that allows managing all the
+   * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active 
on `this` context.
    *
    * @since 2.0.0
    */
-  def streams: ContinuousQueryManager = sparkSession.streams
+  def streams: StreamingQueryManager = sparkSession.streams
 
   /**
    * Returns the names of tables in the current database as an array.

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 9137a73..251f47d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -178,13 +178,13 @@ class SparkSession private(
 
   /**
    * :: Experimental ::
-   * Returns a [[ContinuousQueryManager]] that allows managing all the
-   * [[ContinuousQuery ContinuousQueries]] active on `this`.
+   * Returns a [[StreamingQueryManager]] that allows managing all the
+   * [[StreamingQuery StreamingQueries]] active on `this`.
    *
    * @since 2.0.0
    */
   @Experimental
-  def streams: ContinuousQueryManager = sessionState.continuousQueryManager
+  def streams: StreamingQueryManager = sessionState.streamingQueryManager
 
   /**
    * Start a new session with isolated SQL configurations, temporary tables, 
registered

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 31c9f1a..6cb1a44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -47,7 +47,7 @@ private[sql] object SQLExecution {
       val r = try {
         // sparkContext.getCallSite() would first try to pick up any call site 
that was previously
         // set, then fall back to Utils.getCallSite(); call 
Utils.getCallSite() directly on
-        // continuous queries would give us call site like "run at <unknown>:0"
+        // streaming queries would give us call site like "run at <unknown>:0"
         val callSite = sparkSession.sparkContext.getCallSite()
 
         
sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d1261dd..60466e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.streaming.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.ContinuousQuery
+import org.apache.spark.sql.streaming.StreamingQuery
 
 /**
  * Converts a logical plan into zero or more SparkPlans.  This API is exposed 
for experimenting
@@ -225,7 +225,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   /**
    * Used to plan aggregation queries that are computed incrementally as part 
of a
-   * [[ContinuousQuery]]. Currently this rule is injected into the planner
+   * [[StreamingQuery]]. Currently this rule is injected into the planner
    * on-demand, only when planning in a 
[[org.apache.spark.sql.execution.streaming.StreamExecution]]
    */
   object StatefulAggregationStrategy extends Strategy {

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
deleted file mode 100644
index f50951f..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
-import org.apache.spark.sql.streaming.ContinuousQueryListener
-import org.apache.spark.util.ListenerBus
-
-/**
- * A bus to forward events to [[ContinuousQueryListener]]s. This one will send 
received
- * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also 
registers itself with
- * Spark listener bus, so that it can receive 
[[ContinuousQueryListener.Event]]s and dispatch them
- * to ContinuousQueryListener.
- */
-class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
-  extends SparkListener with ListenerBus[ContinuousQueryListener, 
ContinuousQueryListener.Event] {
-
-  import ContinuousQueryListener._
-
-  sparkListenerBus.addListener(this)
-
-  /**
-   * Post a ContinuousQueryListener event to the Spark listener bus 
asynchronously. This event will
-   * be dispatched to all ContinuousQueryListener in the thread of the Spark 
listener bus.
-   */
-  def post(event: ContinuousQueryListener.Event) {
-    event match {
-      case s: QueryStarted =>
-        postToAll(s)
-      case _ =>
-        sparkListenerBus.post(event)
-    }
-  }
-
-  override def onOtherEvent(event: SparkListenerEvent): Unit = {
-    event match {
-      case e: ContinuousQueryListener.Event =>
-        postToAll(e)
-      case _ =>
-    }
-  }
-
-  override protected def doPostEvent(
-      listener: ContinuousQueryListener,
-      event: ContinuousQueryListener.Event): Unit = {
-    event match {
-      case queryStarted: QueryStarted =>
-        listener.onQueryStarted(queryStarted)
-      case queryProgress: QueryProgress =>
-        listener.onQueryProgress(queryProgress)
-      case queryTerminated: QueryTerminated =>
-        listener.onQueryTerminated(queryTerminated)
-      case _ =>
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5095fe7..4aefd39 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -52,9 +52,9 @@ class StreamExecution(
     val trigger: Trigger,
     private[sql] val triggerClock: Clock,
     val outputMode: OutputMode)
-  extends ContinuousQuery with Logging {
+  extends StreamingQuery with Logging {
 
-  import org.apache.spark.sql.streaming.ContinuousQueryListener._
+  import org.apache.spark.sql.streaming.StreamingQueryListener._
 
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
@@ -101,7 +101,7 @@ class StreamExecution(
   private[sql] var lastExecution: QueryExecution = null
 
   @volatile
-  private[sql] var streamDeathCause: ContinuousQueryException = null
+  private[sql] var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
@@ -140,8 +140,8 @@ class StreamExecution(
   override def sinkStatus: SinkStatus =
     new SinkStatus(sink.toString, 
committedOffsets.toCompositeOffset(sources).toString)
 
-  /** Returns the [[ContinuousQueryException]] if the query was terminated by 
an exception. */
-  override def exception: Option[ContinuousQueryException] = 
Option(streamDeathCause)
+  /** Returns the [[StreamingQueryException]] if the query was terminated by 
an exception. */
+  override def exception: Option[StreamingQueryException] = 
Option(streamDeathCause)
 
   /** Returns the path of a file with `name` in the checkpoint directory. */
   private def checkpointFile(name: String): String =
@@ -199,7 +199,7 @@ class StreamExecution(
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by 
stop()
       case NonFatal(e) =>
-        streamDeathCause = new ContinuousQueryException(
+        streamDeathCause = new StreamingQueryException(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
@@ -227,7 +227,7 @@ class StreamExecution(
   private def populateStartOffsets(): Unit = {
     offsetLog.getLatest() match {
       case Some((batchId, nextOffsets)) =>
-        logInfo(s"Resuming continuous query, starting with batch $batchId")
+        logInfo(s"Resuming streaming query, starting with batch $batchId")
         currentBatchId = batchId
         availableOffsets = nextOffsets.toStreamProgress(sources)
         logDebug(s"Found possibly uncommitted offsets $availableOffsets")
@@ -239,7 +239,7 @@ class StreamExecution(
         }
 
       case None => // We are starting this stream for the first time.
-        logInfo(s"Starting new continuous query.")
+        logInfo(s"Starting new streaming query.")
         currentBatchId = 0
         constructNextBatch()
     }
@@ -383,7 +383,7 @@ class StreamExecution(
     postEvent(new QueryProgress(this.toInfo))
   }
 
-  private def postEvent(event: ContinuousQueryListener.Event) {
+  private def postEvent(event: StreamingQueryListener.Event) {
     sparkSession.streams.postListenerEvent(event)
   }
 
@@ -468,7 +468,7 @@ class StreamExecution(
   }
 
   override def toString: String = {
-    s"Continuous Query - $name [state = $state]"
+    s"Streaming Query - $name [state = $state]"
   }
 
   def toDebugString: String = {
@@ -476,7 +476,7 @@ class StreamExecution(
       "Error:\n" + stackTraceToString(streamDeathCause.cause)
     } else ""
     s"""
-       |=== Continuous Query ===
+       |=== Streaming Query ===
        |Name: $name
        |Current Offsets: $committedOffsets
        |
@@ -490,8 +490,8 @@ class StreamExecution(
      """.stripMargin
   }
 
-  private def toInfo: ContinuousQueryInfo = {
-    new ContinuousQueryInfo(
+  private def toInfo: StreamingQueryInfo = {
+    new StreamingQueryInfo(
       this.name,
       this.id,
       this.sourceStatuses,

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
new file mode 100644
index 0000000..1e66395
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
+import org.apache.spark.sql.streaming.StreamingQueryListener
+import org.apache.spark.util.ListenerBus
+
+/**
+ * A bus to forward events to [[StreamingQueryListener]]s. This one will send 
received
+ * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also 
registers itself with
+ * Spark listener bus, so that it can receive 
[[StreamingQueryListener.Event]]s and dispatch them
+ * to StreamingQueryListener.
+ */
+class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+  extends SparkListener with ListenerBus[StreamingQueryListener, 
StreamingQueryListener.Event] {
+
+  import StreamingQueryListener._
+
+  sparkListenerBus.addListener(this)
+
+  /**
+   * Post a StreamingQueryListener event to the Spark listener bus 
asynchronously. This event will
+   * be dispatched to all StreamingQueryListener in the thread of the Spark 
listener bus.
+   */
+  def post(event: StreamingQueryListener.Event) {
+    event match {
+      case s: QueryStarted =>
+        postToAll(s)
+      case _ =>
+        sparkListenerBus.post(event)
+    }
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: StreamingQueryListener.Event =>
+        postToAll(e)
+      case _ =>
+    }
+  }
+
+  override protected def doPostEvent(
+      listener: StreamingQueryListener,
+      event: StreamingQueryListener.Event): Unit = {
+    event match {
+      case queryStarted: QueryStarted =>
+        listener.onQueryStarted(queryStarted)
+      case queryProgress: QueryProgress =>
+        listener.onQueryProgress(queryProgress)
+      case queryTerminated: QueryTerminated =>
+        listener.onQueryTerminated(queryTerminated)
+      case _ =>
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 02608b0..e8bd489 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2587,7 +2587,7 @@ object functions {
    *   09:00:25-09:01:25 ...
    * }}}
    *
-   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * For a streaming query, you may use the function `current_timestamp` to 
generate windows on
    * processing time.
    *
    * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.
@@ -2641,7 +2641,7 @@ object functions {
    *   09:00:20-09:01:20 ...
    * }}}
    *
-   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * For a streaming query, you may use the function `current_timestamp` to 
generate windows on
    * processing time.
    *
    * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.
@@ -2683,7 +2683,7 @@ object functions {
    *   09:02:00-09:03:00 ...
    * }}}
    *
-   * For a continuous query, you may use the function `current_timestamp` to 
generate windows on
+   * For a streaming query, you may use the function `current_timestamp` to 
generate windows on
    * processing time.
    *
    * @param timeColumn The column or the expression to use as the timestamp 
for windowing by time.

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6978b50..4b8916f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -484,14 +484,14 @@ object SQLConf {
       .createWithDefault(2)
 
   val CHECKPOINT_LOCATION = 
SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
-    .doc("The default location for storing checkpoint data for continuously 
executing queries.")
+    .doc("The default location for storing checkpoint data for streaming 
queries.")
     .stringConf
     .createOptional
 
   val UNSUPPORTED_OPERATION_CHECK_ENABLED =
     SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
       .internal()
-      .doc("When true, the logical plan for continuous query will be checked 
for unsupported" +
+      .doc("When true, the logical plan for streaming query will be checked 
for unsupported" +
         " operations.")
       .booleanConf
       .createWithDefault(true)

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index b430950..59efa81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, 
FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
-import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager}
+import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
@@ -143,10 +143,10 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
   lazy val listenerManager: ExecutionListenerManager = new 
ExecutionListenerManager
 
   /**
-   * Interface to start and stop [[ContinuousQuery]]s.
+   * Interface to start and stop [[StreamingQuery]]s.
    */
-  lazy val continuousQueryManager: ContinuousQueryManager = {
-    new ContinuousQueryManager(sparkSession)
+  lazy val streamingQueryManager: StreamingQueryManager = {
+    new StreamingQueryManager(sparkSession)
   }
 
   private val jarClassLoader: NonClosableMutableURLClassLoader =

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
deleted file mode 100644
index 1e0a47d..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.SparkSession
-
-/**
- * :: Experimental ::
- * A handle to a query that is executing continuously in the background as new 
data arrives.
- * All these methods are thread-safe.
- * @since 2.0.0
- */
-@Experimental
-trait ContinuousQuery {
-
-  /**
-   * Returns the name of the query. This name is unique across all active 
queries. This can be
-   * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
-   * `dataframe.write().queryName("query").startStream()`.
-   * @since 2.0.0
-   */
-  def name: String
-
-  /**
-   * Returns the unique id of this query. This id is automatically generated 
and is unique across
-   * all queries that have been started in the current process.
-   * @since 2.0.0
-   */
-  def id: Long
-
-  /**
-   * Returns the [[SparkSession]] associated with `this`.
-   * @since 2.0.0
-   */
-  def sparkSession: SparkSession
-
-  /**
-   * Whether the query is currently active or not
-   * @since 2.0.0
-   */
-  def isActive: Boolean
-
-  /**
-   * Returns the [[ContinuousQueryException]] if the query was terminated by 
an exception.
-   * @since 2.0.0
-   */
-  def exception: Option[ContinuousQueryException]
-
-  /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
-   */
-  def sourceStatuses: Array[SourceStatus]
-
-  /** Returns current status of the sink. */
-  def sinkStatus: SinkStatus
-
-  /**
-   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
-   * If the query has terminated with an exception, then the exception will be 
thrown.
-   *
-   * If the query has terminated, then all subsequent calls to this method 
will either return
-   * immediately (if the query was terminated by `stop()`), or throw the 
exception
-   * immediately (if the query has terminated with exception).
-   *
-   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception.
-   *
-   * @since 2.0.0
-   */
-  def awaitTermination(): Unit
-
-  /**
-   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
-   * If the query has terminated with an exception, then the exception will be 
thrown.
-   * Otherwise, it returns whether the query has terminated or not within the 
`timeoutMs`
-   * milliseconds.
-   *
-   * If the query has terminated, then all subsequent calls to this method 
will either return
-   * `true` immediately (if the query was terminated by `stop()`), or throw 
the exception
-   * immediately (if the query has terminated with exception).
-   *
-   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitTermination(timeoutMs: Long): Boolean
-
-  /**
-   * Blocks until all available data in the source has been processed and 
committed to the sink.
-   * This method is intended for testing. Note that in the case of continually 
arriving data, this
-   * method may block forever. Additionally, this method is only guaranteed to 
block until data that
-   * has been synchronously appended data to a 
[[org.apache.spark.sql.execution.streaming.Source]]
-   * prior to invocation. (i.e. `getOffset` must immediately reflect the 
addition).
-   */
-  def processAllAvailable(): Unit
-
-  /**
-   * Stops the execution of this query if it is running. This method blocks 
until the threads
-   * performing execution has stopped.
-   * @since 2.0.0
-   */
-  def stop(): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
deleted file mode 100644
index 5196c5a..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
-
-/**
- * :: Experimental ::
- * Exception that stopped a [[ContinuousQuery]].
- * @param query      Query that caused the exception
- * @param message     Message of this exception
- * @param cause       Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which 
exception occurred
- * @param endOffset   Ending offset (if known) of the range of data in 
exception occurred
- * @since 2.0.0
- */
-@Experimental
-class ContinuousQueryException private[sql](
-    @transient val query: ContinuousQuery,
-    val message: String,
-    val cause: Throwable,
-    val startOffset: Option[Offset] = None,
-    val endOffset: Option[Offset] = None)
-  extends Exception(message, cause) {
-
-  /** Time when the exception occurred */
-  val time: Long = System.currentTimeMillis
-
-  override def toString(): String = {
-    val causeStr =
-      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", 
"\n|\t", "\n")}"
-    s"""
-       |$causeStr
-       |
-       |${query.asInstanceOf[StreamExecution].toDebugString}
-       """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
deleted file mode 100644
index 19f2270..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * A class used to report information about the progress of a 
[[ContinuousQuery]].
- *
- * @param name The [[ContinuousQuery]] name. This name is unique across all 
active queries.
- * @param id The [[ContinuousQuery]] id. This id is unique across
-  *          all queries that have been started in the current process.
- * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s 
sources.
- * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
- */
-@Experimental
-class ContinuousQueryInfo private[sql](
-  val name: String,
-  val id: Long,
-  val sourceStatuses: Seq[SourceStatus],
-  val sinkStatus: SinkStatus)

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
deleted file mode 100644
index dd31114..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.scheduler.SparkListenerEvent
-
-/**
- * :: Experimental ::
- * Interface for listening to events related to [[ContinuousQuery 
ContinuousQueries]].
- * @note The methods are not thread-safe as they may be called from different 
threads.
- *
- * @since 2.0.0
- */
-@Experimental
-abstract class ContinuousQueryListener {
-
-  import ContinuousQueryListener._
-
-  /**
-   * Called when a query is started.
-   * @note This is called synchronously with
-   *       [[org.apache.spark.sql.DataFrameWriter 
`DataFrameWriter.startStream()`]],
-   *       that is, `onQueryStart` will be called on all listeners before
-   *       `DataFrameWriter.startStream()` returns the corresponding 
[[ContinuousQuery]]. Please
-   *       don't block this method as it will block your query.
-   * @since 2.0.0
-   */
-  def onQueryStarted(queryStarted: QueryStarted): Unit
-
-  /**
-   * Called when there is some status update (ingestion rate updated, etc.)
-   *
-   * @note This method is asynchronous. The status in [[ContinuousQuery]] will 
always be
-   *       latest no matter when this method is called. Therefore, the status 
of [[ContinuousQuery]]
-   *       may be changed before/when you process the event. E.g., you may 
find [[ContinuousQuery]]
-   *       is terminated when you are processing [[QueryProgress]].
-   * @since 2.0.0
-   */
-  def onQueryProgress(queryProgress: QueryProgress): Unit
-
-  /**
-   * Called when a query is stopped, with or without error.
-   * @since 2.0.0
-   */
-  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
-}
-
-
-/**
- * :: Experimental ::
- * Companion object of [[ContinuousQueryListener]] that defines the listener 
events.
- * @since 2.0.0
- */
-@Experimental
-object ContinuousQueryListener {
-
-  /**
-   * :: Experimental ::
-   * Base type of [[ContinuousQueryListener]] events
-   * @since 2.0.0
-   */
-  @Experimental
-  trait Event extends SparkListenerEvent
-
-  /**
-   * :: Experimental ::
-   * Event representing the start of a query
-   * @since 2.0.0
-   */
-  @Experimental
-  class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends 
Event
-
-  /**
-   * :: Experimental ::
-   * Event representing any progress updates in a query
-   * @since 2.0.0
-   */
-  @Experimental
-  class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends 
Event
-
-  /**
-   * :: Experimental ::
-   * Event representing that termination of a query
-   *
-   * @param queryInfo Information about the status of the query.
-   * @param exception The exception message of the [[ContinuousQuery]] if the 
query was terminated
-   *                  with an exception. Otherwise, it will be `None`.
-   * @param stackTrace The stack trace of the exception if the query was 
terminated with an
-   *                   exception. It will be empty if there was no error.
-   * @since 2.0.0
-   */
-  @Experimental
-  class QueryTerminated private[sql](
-      val queryInfo: ContinuousQueryInfo,
-      val exception: Option[String],
-      val stackTrace: Seq[StackTraceElement]) extends Event
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
deleted file mode 100644
index 0f4a9c9..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{Clock, SystemClock, Utils}
-
-/**
- * :: Experimental ::
- * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]].
- *
- * @since 2.0.0
- */
-@Experimental
-class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
-
-  private[sql] val stateStoreCoordinator =
-    StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
-  private val listenerBus = new 
ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
-  private val activeQueries = new mutable.HashMap[Long, ContinuousQuery]
-  private val activeQueriesLock = new Object
-  private val awaitTerminationLock = new Object
-
-  private var lastTerminatedQuery: ContinuousQuery = null
-
-  /**
-   * Returns a list of active queries associated with this SQLContext
-   *
-   * @since 2.0.0
-   */
-  def active: Array[ContinuousQuery] = activeQueriesLock.synchronized {
-    activeQueries.values.toArray
-  }
-
-  /**
-   * Returns the query if there is an active query with the given id, or null.
-   *
-   * @since 2.0.0
-   */
-  def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized {
-    activeQueries.get(id).orNull
-  }
-
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
-   * with an exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws ContinuousQueryException, if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitAnyTermination(): Unit = {
-    awaitTerminationLock.synchronized {
-      while (lastTerminatedQuery == null) {
-        awaitTerminationLock.wait(10)
-      }
-      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
-        throw lastTerminatedQuery.exception.get
-      }
-    }
-  }
-
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. Returns 
whether any query
-   * has terminated or not (multiple may have terminated). If any query has 
terminated with an
-   * exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return `true` immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws ContinuousQueryException, if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitAnyTermination(timeoutMs: Long): Boolean = {
-
-    val startTime = System.currentTimeMillis
-    def isTimedout = System.currentTimeMillis - startTime >= timeoutMs
-
-    awaitTerminationLock.synchronized {
-      while (!isTimedout && lastTerminatedQuery == null) {
-        awaitTerminationLock.wait(10)
-      }
-      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
-        throw lastTerminatedQuery.exception.get
-      }
-      lastTerminatedQuery != null
-    }
-  }
-
-  /**
-   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
-   * wait for new terminations.
-   *
-   * @since 2.0.0
-   */
-  def resetTerminated(): Unit = {
-    awaitTerminationLock.synchronized {
-      lastTerminatedQuery = null
-    }
-  }
-
-  /**
-   * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle 
events of
-   * [[ContinuousQuery]].
-   *
-   * @since 2.0.0
-   */
-  def addListener(listener: ContinuousQueryListener): Unit = {
-    listenerBus.addListener(listener)
-  }
-
-  /**
-   * Deregister a [[ContinuousQueryListener]].
-   *
-   * @since 2.0.0
-   */
-  def removeListener(listener: ContinuousQueryListener): Unit = {
-    listenerBus.removeListener(listener)
-  }
-
-  /** Post a listener event */
-  private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): 
Unit = {
-    listenerBus.post(event)
-  }
-
-  /**
-   * Start a [[ContinuousQuery]].
-   * @param userSpecifiedName Query name optionally specified by the user.
-   * @param userSpecifiedCheckpointLocation  Checkpoint location optionally 
specified by the user.
-   * @param df Streaming DataFrame.
-   * @param sink  Sink to write the streaming outputs.
-   * @param outputMode  Output mode for the sink.
-   * @param useTempCheckpointLocation  Whether to use a temporary checkpoint 
location when the user
-   *                                   has not specified one. If false, then 
error will be thrown.
-   * @param recoverFromCheckpointLocation  Whether to recover query from the 
checkpoint location.
-   *                                       If false and the checkpoint 
location exists, then error
-   *                                       will be thrown.
-   * @param trigger [[Trigger]] for the query.
-   * @param triggerClock [[Clock]] to use for the triggering.
-   */
-  private[sql] def startQuery(
-      userSpecifiedName: Option[String],
-      userSpecifiedCheckpointLocation: Option[String],
-      df: DataFrame,
-      sink: Sink,
-      outputMode: OutputMode,
-      useTempCheckpointLocation: Boolean = false,
-      recoverFromCheckpointLocation: Boolean = true,
-      trigger: Trigger = ProcessingTime(0),
-      triggerClock: Clock = new SystemClock()): ContinuousQuery = {
-    activeQueriesLock.synchronized {
-      val id = StreamExecution.nextId
-      val name = userSpecifiedName.getOrElse(s"query-$id")
-      if (activeQueries.values.exists(_.name == name)) {
-        throw new IllegalArgumentException(
-          s"Cannot start query with name $name as a query with that name is 
already active")
-      }
-      val checkpointLocation = userSpecifiedCheckpointLocation.map { 
userSpecified =>
-        new Path(userSpecified).toUri.toString
-      }.orElse {
-        df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
-          new Path(location, name).toUri.toString
-        }
-      }.getOrElse {
-        if (useTempCheckpointLocation) {
-          Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
-        } else {
-          throw new AnalysisException(
-            "checkpointLocation must be specified either " +
-              """through option("checkpointLocation", ...) or """ +
-              s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
-        }
-      }
-
-      // If offsets have already been created, we trying to resume a query.
-      if (!recoverFromCheckpointLocation) {
-        val checkpointPath = new Path(checkpointLocation, "offsets")
-        val fs = 
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
-        if (fs.exists(checkpointPath)) {
-          throw new AnalysisException(
-            s"This query does not support recovering from checkpoint location. 
" +
-              s"Delete $checkpointPath to start over.")
-        }
-      }
-
-      val analyzedPlan = df.queryExecution.analyzed
-      df.queryExecution.assertAnalyzed()
-
-      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
-        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
-      }
-
-      var nextSourceId = 0L
-
-      val logicalPlan = analyzedPlan.transform {
-        case StreamingRelation(dataSource, _, output) =>
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous 
`output`.
-          StreamingExecutionRelation(source, output)
-      }
-      val query = new StreamExecution(
-        sparkSession,
-        id,
-        name,
-        checkpointLocation,
-        logicalPlan,
-        sink,
-        trigger,
-        triggerClock,
-        outputMode)
-      query.start()
-      activeQueries.put(id, query)
-      query
-    }
-  }
-
-  /** Notify (by the ContinuousQuery) that the query has been terminated */
-  private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): 
Unit = {
-    activeQueriesLock.synchronized {
-      activeQueries -= terminatedQuery.id
-    }
-    awaitTerminationLock.synchronized {
-      if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
-        lastTerminatedQuery = terminatedQuery
-      }
-      awaitTerminationLock.notifyAll()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b035ff7..1977074 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -109,7 +109,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 
   /**
    * :: Experimental ::
-   * Specifies the name of the [[ContinuousQuery]] that can be started with 
`startStream()`.
+   * Specifies the name of the [[StreamingQuery]] that can be started with 
`startStream()`.
    * This name must be unique among all the currently active queries in the 
associated SQLContext.
    *
    * @since 2.0.0
@@ -221,26 +221,26 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   /**
    * :: Experimental ::
    * Starts the execution of the streaming query, which will continually 
output results to the given
-   * path as new data arrives. The returned [[ContinuousQuery]] object can be 
used to interact with
+   * path as new data arrives. The returned [[StreamingQuery]] object can be 
used to interact with
    * the stream.
    *
    * @since 2.0.0
    */
   @Experimental
-  def start(path: String): ContinuousQuery = {
+  def start(path: String): StreamingQuery = {
     option("path", path).start()
   }
 
   /**
    * :: Experimental ::
    * Starts the execution of the streaming query, which will continually 
output results to the given
-   * path as new data arrives. The returned [[ContinuousQuery]] object can be 
used to interact with
+   * path as new data arrives. The returned [[StreamingQuery]] object can be 
used to interact with
    * the stream.
    *
    * @since 2.0.0
    */
   @Experimental
-  def start(): ContinuousQuery = {
+  def start(): StreamingQuery = {
     if (source == "memory") {
       assertNotPartitioned("memory")
       if (extraOptions.get("queryName").isEmpty) {
@@ -249,7 +249,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
-      val query = 
df.sparkSession.sessionState.continuousQueryManager.startQuery(
+      val query = 
df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         extraOptions.get("checkpointLocation"),
         df,
@@ -263,7 +263,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
     } else if (source == "foreach") {
       assertNotPartitioned("foreach")
       val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
-      df.sparkSession.sessionState.continuousQueryManager.startQuery(
+      df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         extraOptions.get("checkpointLocation"),
         df,
@@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
           className = source,
           options = extraOptions.toMap,
           partitionColumns = normalizedParCols.getOrElse(Nil))
-      df.sparkSession.sessionState.continuousQueryManager.startQuery(
+      df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         extraOptions.get("checkpointLocation"),
         df,

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
new file mode 100644
index 0000000..dc81a5b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.SparkSession
+
+/**
+ * :: Experimental ::
+ * A handle to a query that is executing continuously in the background as new 
data arrives.
+ * All these methods are thread-safe.
+ * @since 2.0.0
+ */
+@Experimental
+trait StreamingQuery {
+
+  /**
+   * Returns the name of the query. This name is unique across all active 
queries. This can be
+   * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
+   * `dataframe.write().queryName("query").startStream()`.
+   * @since 2.0.0
+   */
+  def name: String
+
+  /**
+   * Returns the unique id of this query. This id is automatically generated 
and is unique across
+   * all queries that have been started in the current process.
+   * @since 2.0.0
+   */
+  def id: Long
+
+  /**
+   * Returns the [[SparkSession]] associated with `this`.
+   * @since 2.0.0
+   */
+  def sparkSession: SparkSession
+
+  /**
+   * Whether the query is currently active or not
+   * @since 2.0.0
+   */
+  def isActive: Boolean
+
+  /**
+   * Returns the [[StreamingQueryException]] if the query was terminated by an 
exception.
+   * @since 2.0.0
+   */
+  def exception: Option[StreamingQueryException]
+
+  /**
+   * Returns current status of all the sources.
+   * @since 2.0.0
+   */
+  def sourceStatuses: Array[SourceStatus]
+
+  /** Returns current status of the sink. */
+  def sinkStatus: SinkStatus
+
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
+   * If the query has terminated with an exception, then the exception will be 
thrown.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will either return
+   * immediately (if the query was terminated by `stop()`), or throw the 
exception
+   * immediately (if the query has terminated with exception).
+   *
+   * @throws StreamingQueryException, if `this` query has terminated with an 
exception.
+   *
+   * @since 2.0.0
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
+   * If the query has terminated with an exception, then the exception will be 
thrown.
+   * Otherwise, it returns whether the query has terminated or not within the 
`timeoutMs`
+   * milliseconds.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will either return
+   * `true` immediately (if the query was terminated by `stop()`), or throw 
the exception
+   * immediately (if the query has terminated with exception).
+   *
+   * @throws StreamingQueryException, if `this` query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitTermination(timeoutMs: Long): Boolean
+
+  /**
+   * Blocks until all available data in the source has been processed and 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever. Additionally, this method is only guaranteed to 
block until data that
+   * has been synchronously appended data to a 
[[org.apache.spark.sql.execution.streaming.Source]]
+   * prior to invocation. (i.e. `getOffset` must immediately reflect the 
addition).
+   */
+  def processAllAvailable(): Unit
+
+  /**
+   * Stops the execution of this query if it is running. This method blocks 
until the threads
+   * performing execution has stopped.
+   * @since 2.0.0
+   */
+  def stop(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
new file mode 100644
index 0000000..90f95ca
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
+
+/**
+ * :: Experimental ::
+ * Exception that stopped a [[StreamingQuery]].
+ * @param query      Query that caused the exception
+ * @param message     Message of this exception
+ * @param cause       Internal cause of this exception
+ * @param startOffset Starting offset (if known) of the range of data in which 
exception occurred
+ * @param endOffset   Ending offset (if known) of the range of data in 
exception occurred
+ * @since 2.0.0
+ */
+@Experimental
+class StreamingQueryException private[sql](
+    @transient val query: StreamingQuery,
+    val message: String,
+    val cause: Throwable,
+    val startOffset: Option[Offset] = None,
+    val endOffset: Option[Offset] = None)
+  extends Exception(message, cause) {
+
+  /** Time when the exception occurred */
+  val time: Long = System.currentTimeMillis
+
+  override def toString(): String = {
+    val causeStr =
+      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", 
"\n|\t", "\n")}"
+    s"""
+       |$causeStr
+       |
+       |${query.asInstanceOf[StreamExecution].toDebugString}
+       """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
new file mode 100644
index 0000000..1af2668
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * A class used to report information about the progress of a 
[[StreamingQuery]].
+ *
+ * @param name The [[StreamingQuery]] name. This name is unique across all 
active queries.
+ * @param id The [[StreamingQuery]] id. This id is unique across
+  *          all queries that have been started in the current process.
+ * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s 
sources.
+ * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
+ */
+@Experimental
+class StreamingQueryInfo private[sql](
+  val name: String,
+  val id: Long,
+  val sourceStatuses: Seq[SourceStatus],
+  val sinkStatus: SinkStatus)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to