Repository: spark
Updated Branches:
  refs/heads/master 5ad4e32d4 -> c654ae214


[SPARK-15889][SQL][STREAMING] Add a unique id to ContinuousQuery

## What changes were proposed in this pull request?

ContinuousQueries have names that are unique across all the active ones. 
However, when queries are rapidly restarted with same name, it causes races 
conditions with the listener. A listener event from a stopped query can arrive 
after the query has been restarted, leading to complexities in monitoring 
infrastructure.

Along with this change, I have also consolidated all the messy code paths to 
start queries with different sinks.

## How was this patch tested?
Added unit tests, and existing unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13613 from tdas/SPARK-15889.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c654ae21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c654ae21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c654ae21

Branch: refs/heads/master
Commit: c654ae2140bc184adb407fd02072b653c5359ee5
Parents: 5ad4e32
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Jun 13 13:44:46 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Jun 13 13:44:46 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 69 +++++-------------
 .../execution/streaming/StreamExecution.scala   |  8 ++-
 .../spark/sql/streaming/ContinuousQuery.scala   | 11 ++-
 .../sql/streaming/ContinuousQueryInfo.scala     |  5 +-
 .../sql/streaming/ContinuousQueryManager.scala  | 74 ++++++++++++++++----
 .../ContinuousQueryListenerSuite.scala          | 17 ++++-
 .../streaming/ContinuousQueryManagerSuite.scala | 29 +++-----
 .../sql/streaming/ContinuousQuerySuite.scala    | 43 +++++++++++-
 .../apache/spark/sql/streaming/StreamTest.scala | 12 ++--
 9 files changed, 167 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index afae078..171b137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -336,34 +336,23 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     assertStreaming("startStream() can only be called on continuous queries")
 
     if (source == "memory") {
-      val queryName =
-        extraOptions.getOrElse(
-          "queryName", throw new AnalysisException("queryName must be 
specified for memory sink"))
-      val checkpointLocation = getCheckpointLocation(queryName, failIfNotSet = 
false).getOrElse {
-        Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
-      }
-
-      // If offsets have already been created, we trying to resume a query.
-      val checkpointPath = new Path(checkpointLocation, "offsets")
-      val fs = 
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
-      if (fs.exists(checkpointPath)) {
-        throw new AnalysisException(
-          s"Unable to resume query written to memory sink. Delete 
$checkpointPath to start over.")
-      } else {
-        checkpointPath.toUri.toString
+      if (extraOptions.get("queryName").isEmpty) {
+        throw new AnalysisException("queryName must be specified for memory 
sink")
       }
 
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
-      resultDf.createOrReplaceTempView(queryName)
-      val continuousQuery = 
df.sparkSession.sessionState.continuousQueryManager.startQuery(
-        queryName,
-        checkpointLocation,
+      val query = 
df.sparkSession.sessionState.continuousQueryManager.startQuery(
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
         df,
         sink,
         outputMode,
-        trigger)
-      continuousQuery
+        useTempCheckpointLocation = true,
+        recoverFromCheckpointLocation = false,
+        trigger = trigger)
+      resultDf.createOrReplaceTempView(query.name)
+      query
     } else {
       val dataSource =
         DataSource(
@@ -371,14 +360,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
           className = source,
           options = extraOptions.toMap,
           partitionColumns = normalizedParCols.getOrElse(Nil))
-      val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
       df.sparkSession.sessionState.continuousQueryManager.startQuery(
-        queryName,
-        getCheckpointLocation(queryName, failIfNotSet = true).get,
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
         df,
         dataSource.createSink(outputMode),
         outputMode,
-        trigger)
+        trigger = trigger)
     }
   }
 
@@ -437,38 +425,15 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     assertStreaming(
       "foreach() can only be called on streaming Datasets/DataFrames.")
 
-    val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
     val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
     df.sparkSession.sessionState.continuousQueryManager.startQuery(
-      queryName,
-      getCheckpointLocation(queryName, failIfNotSet = false).getOrElse {
-        Utils.createTempDir(namePrefix = "foreach.stream").getCanonicalPath
-      },
+      extraOptions.get("queryName"),
+      extraOptions.get("checkpointLocation"),
       df,
       sink,
       outputMode,
-      trigger)
-  }
-
-  /**
-   * Returns the checkpointLocation for a query. If `failIfNotSet` is `true` 
but the checkpoint
-   * location is not set, [[AnalysisException]] will be thrown. If 
`failIfNotSet` is `false`, `None`
-   * will be returned if the checkpoint location is not set.
-   */
-  private def getCheckpointLocation(queryName: String, failIfNotSet: Boolean): 
Option[String] = {
-    val checkpointLocation = extraOptions.get("checkpointLocation").map { 
userSpecified =>
-      new Path(userSpecified).toUri.toString
-    }.orElse {
-      df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
-        new Path(location, queryName).toUri.toString
-      }
-    }
-    if (failIfNotSet && checkpointLocation.isEmpty) {
-      throw new AnalysisException("checkpointLocation must be specified either 
" +
-        """through option("checkpointLocation", ...) or """ +
-        s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
-    }
-    checkpointLocation
+      useTempCheckpointLocation = true,
+      trigger = trigger)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 954fc33..5095fe7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.mutable.ArrayBuffer
@@ -44,6 +44,7 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  */
 class StreamExecution(
     override val sparkSession: SparkSession,
+    override val id: Long,
     override val name: String,
     checkpointRoot: String,
     private[sql] val logicalPlan: LogicalPlan,
@@ -492,6 +493,7 @@ class StreamExecution(
   private def toInfo: ContinuousQueryInfo = {
     new ContinuousQueryInfo(
       this.name,
+      this.id,
       this.sourceStatuses,
       this.sinkStatus)
   }
@@ -503,7 +505,7 @@ class StreamExecution(
 }
 
 private[sql] object StreamExecution {
-  private val nextId = new AtomicInteger()
+  private val _nextId = new AtomicLong(0)
 
-  def nextName: String = s"query-${nextId.getAndIncrement}"
+  def nextId: Long = _nextId.getAndIncrement()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
index 3bbb0b8..1e0a47d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
@@ -30,12 +30,21 @@ import org.apache.spark.sql.SparkSession
 trait ContinuousQuery {
 
   /**
-   * Returns the name of the query.
+   * Returns the name of the query. This name is unique across all active 
queries. This can be
+   * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
+   * `dataframe.write().queryName("query").startStream()`.
    * @since 2.0.0
    */
   def name: String
 
   /**
+   * Returns the unique id of this query. This id is automatically generated 
and is unique across
+   * all queries that have been started in the current process.
+   * @since 2.0.0
+   */
+  def id: Long
+
+  /**
    * Returns the [[SparkSession]] associated with `this`.
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
index 57b718b..19f2270 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
@@ -23,12 +23,15 @@ import org.apache.spark.annotation.Experimental
  * :: Experimental ::
  * A class used to report information about the progress of a 
[[ContinuousQuery]].
  *
- * @param name The [[ContinuousQuery]] name.
+ * @param name The [[ContinuousQuery]] name. This name is unique across all 
active queries.
+ * @param id The [[ContinuousQuery]] id. This id is unique across
+  *          all queries that have been started in the current process.
  * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s 
sources.
  * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
  */
 @Experimental
 class ContinuousQueryInfo private[sql](
   val name: String,
+  val id: Long,
   val sourceStatuses: Seq[SourceStatus],
   val sinkStatus: SinkStatus)

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
index 1bfdd2d..0f4a9c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.streaming
 
 import scala.collection.mutable
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * :: Experimental ::
@@ -39,7 +41,7 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
   private val listenerBus = new 
ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
-  private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
+  private val activeQueries = new mutable.HashMap[Long, ContinuousQuery]
   private val activeQueriesLock = new Object
   private val awaitTerminationLock = new Object
 
@@ -55,13 +57,12 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
   }
 
   /**
-   * Returns an active query from this SQLContext or throws exception if bad 
name
+   * Returns the query if there is an active query with the given id, or null.
    *
    * @since 2.0.0
    */
-  def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
-    activeQueries.getOrElse(name,
-      throw new IllegalArgumentException(s"There is no active query with name 
$name"))
+  def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized {
+    activeQueries.get(id).orNull
   }
 
   /**
@@ -168,20 +169,66 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
     listenerBus.post(event)
   }
 
-  /** Start a query */
+  /**
+   * Start a [[ContinuousQuery]].
+   * @param userSpecifiedName Query name optionally specified by the user.
+   * @param userSpecifiedCheckpointLocation  Checkpoint location optionally 
specified by the user.
+   * @param df Streaming DataFrame.
+   * @param sink  Sink to write the streaming outputs.
+   * @param outputMode  Output mode for the sink.
+   * @param useTempCheckpointLocation  Whether to use a temporary checkpoint 
location when the user
+   *                                   has not specified one. If false, then 
error will be thrown.
+   * @param recoverFromCheckpointLocation  Whether to recover query from the 
checkpoint location.
+   *                                       If false and the checkpoint 
location exists, then error
+   *                                       will be thrown.
+   * @param trigger [[Trigger]] for the query.
+   * @param triggerClock [[Clock]] to use for the triggering.
+   */
   private[sql] def startQuery(
-      name: String,
-      checkpointLocation: String,
+      userSpecifiedName: Option[String],
+      userSpecifiedCheckpointLocation: Option[String],
       df: DataFrame,
       sink: Sink,
       outputMode: OutputMode,
+      useTempCheckpointLocation: Boolean = false,
+      recoverFromCheckpointLocation: Boolean = true,
       trigger: Trigger = ProcessingTime(0),
       triggerClock: Clock = new SystemClock()): ContinuousQuery = {
     activeQueriesLock.synchronized {
-      if (activeQueries.contains(name)) {
+      val id = StreamExecution.nextId
+      val name = userSpecifiedName.getOrElse(s"query-$id")
+      if (activeQueries.values.exists(_.name == name)) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is 
already active")
       }
+      val checkpointLocation = userSpecifiedCheckpointLocation.map { 
userSpecified =>
+        new Path(userSpecified).toUri.toString
+      }.orElse {
+        df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+          new Path(location, name).toUri.toString
+        }
+      }.getOrElse {
+        if (useTempCheckpointLocation) {
+          Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+        } else {
+          throw new AnalysisException(
+            "checkpointLocation must be specified either " +
+              """through option("checkpointLocation", ...) or """ +
+              s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
+        }
+      }
+
+      // If offsets have already been created, we trying to resume a query.
+      if (!recoverFromCheckpointLocation) {
+        val checkpointPath = new Path(checkpointLocation, "offsets")
+        val fs = 
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+        if (fs.exists(checkpointPath)) {
+          throw new AnalysisException(
+            s"This query does not support recovering from checkpoint location. 
" +
+              s"Delete $checkpointPath to start over.")
+        }
+      }
+
       val analyzedPlan = df.queryExecution.analyzed
       df.queryExecution.assertAnalyzed()
 
@@ -203,6 +250,7 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
       }
       val query = new StreamExecution(
         sparkSession,
+        id,
         name,
         checkpointLocation,
         logicalPlan,
@@ -211,7 +259,7 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
         triggerClock,
         outputMode)
       query.start()
-      activeQueries.put(name, query)
+      activeQueries.put(id, query)
       query
     }
   }
@@ -219,7 +267,7 @@ class ContinuousQueryManager private[sql] (sparkSession: 
SparkSession) {
   /** Notify (by the ContinuousQuery) that the query has been terminated */
   private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): 
Unit = {
     activeQueriesLock.synchronized {
-      activeQueries -= terminatedQuery.name
+      activeQueries -= terminatedQuery.id
     }
     awaitTerminationLock.synchronized {
       if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index 9b59ab6..8e1de09 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -50,9 +50,11 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     withListenerAdded(listener) {
       testStream(input.toDS)(
         StartStream(),
-        Assert("Incorrect query status in onQueryStarted") {
+        AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
           val status = listener.startStatus
           assert(status != null)
+          assert(status.name === query.name)
+          assert(status.id === query.id)
           assert(status.sourceStatuses.size === 1)
           assert(status.sourceStatuses(0).description.contains("Memory"))
 
@@ -67,13 +69,15 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         },
         AddDataMemory(input, Seq(1, 2, 3)),
         CheckAnswer(1, 2, 3),
-        Assert("Incorrect query status in onQueryProgress") {
+        AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
           eventually(Timeout(streamingTimeout)) {
 
             // There should be only on progress event as batch has been 
processed
             assert(listener.progressStatuses.size === 1)
             val status = listener.progressStatuses.peek()
             assert(status != null)
+            assert(status.name === query.name)
+            assert(status.id === query.id)
             assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
             assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
 
@@ -82,12 +86,16 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
           }
         },
         StopStream,
-        Assert("Incorrect query status in onQueryTerminated") {
+        AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
           eventually(Timeout(streamingTimeout)) {
             val status = listener.terminationStatus
             assert(status != null)
+            assert(status.name === query.name)
+            assert(status.id === query.id)
             assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
             assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
+            assert(listener.terminationStackTrace.isEmpty)
+            assert(listener.terminationException === None)
           }
           listener.checkAsyncErrors()
         }
@@ -161,6 +169,7 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("QueryStarted serialization") {
     val queryStartedInfo = new ContinuousQueryInfo(
       "name",
+      1,
       Seq(new SourceStatus("source1", None), new SourceStatus("source2", 
None)),
       new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
     val queryStarted = new 
ContinuousQueryListener.QueryStarted(queryStartedInfo)
@@ -173,6 +182,7 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("QueryProgress serialization") {
     val queryProcessInfo = new ContinuousQueryInfo(
       "name",
+      1,
       Seq(
         new SourceStatus("source1", Some(LongOffset(0).toString)),
         new SourceStatus("source2", Some(LongOffset(1).toString))),
@@ -187,6 +197,7 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("QueryTerminated serialization") {
     val queryTerminatedInfo = new ContinuousQueryInfo(
       "name",
+      1,
       Seq(
         new SourceStatus("source1", Some(LongOffset(0).toString)),
         new SourceStatus("source2", Some(LongOffset(1).toString))),

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index c1e4970..f81608b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -59,23 +59,15 @@ class ContinuousQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
       assert(spark.streams.active.toSet === queries.toSet)
       val (q1, q2, q3) = (queries(0), queries(1), queries(2))
 
-      assert(spark.streams.get(q1.name).eq(q1))
-      assert(spark.streams.get(q2.name).eq(q2))
-      assert(spark.streams.get(q3.name).eq(q3))
-      intercept[IllegalArgumentException] {
-        spark.streams.get("non-existent-name")
-      }
-
+      assert(spark.streams.get(q1.id).eq(q1))
+      assert(spark.streams.get(q2.id).eq(q2))
+      assert(spark.streams.get(q3.id).eq(q3))
+      assert(spark.streams.get(-1) === null) // non-existent id
       q1.stop()
 
       assert(spark.streams.active.toSet === Set(q2, q3))
-      val ex1 = withClue("no error while getting non-active query") {
-        intercept[IllegalArgumentException] {
-          spark.streams.get(q1.name)
-        }
-      }
-      assert(ex1.getMessage.contains(q1.name), "error does not contain name of 
query to be fetched")
-      assert(spark.streams.get(q2.name).eq(q2))
+      assert(spark.streams.get(q1.id) === null)
+      assert(spark.streams.get(q2.id).eq(q2))
 
       m2.addData(0)   // q2 should terminate with error
 
@@ -83,12 +75,7 @@ class ContinuousQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
         require(!q2.isActive)
         require(q2.exception.isDefined)
       }
-      withClue("no error while getting non-active query") {
-        intercept[IllegalArgumentException] {
-          spark.streams.get(q2.name).eq(q2)
-        }
-      }
-
+      assert(spark.streams.get(q2.id) === null)
       assert(spark.streams.active.toSet === Set(q3))
     }
   }
@@ -227,7 +214,7 @@ class ContinuousQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
 
-  /** Run a body of code by defining a query each on multiple datasets */
+  /** Run a body of code by defining a query on each dataset */
   private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] 
=> Unit): Unit = {
     failAfter(streamingTimeout) {
       val queries = withClue("Error starting queries") {

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 5542405..43a8857 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.streaming
 
+import org.scalatest.BeforeAndAfter
+
 import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, 
MemoryStream, StreamExecution}
+import org.apache.spark.util.Utils
 
 
-class ContinuousQuerySuite extends StreamTest {
+class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
 
   import AwaitTerminationTester._
   import testImplicits._
 
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("names unique across active queries, ids unique across all started 
queries") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map { 6 / _}
+
+    def startQuery(queryName: String): ContinuousQuery = {
+      val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.checkpoint").getCanonicalPath
+      val writer = mapped.write
+      writer
+        .queryName(queryName)
+        .format("memory")
+        .option("checkpointLocation", metadataRoot)
+        .startStream()
+    }
+
+    val q1 = startQuery("q1")
+    assert(q1.name === "q1")
+
+    // Verify that another query with same name cannot be started
+    val e1 = intercept[IllegalArgumentException] {
+      startQuery("q1")
+    }
+    Seq("q1", "already active").foreach { s => 
assert(e1.getMessage.contains(s)) }
+
+    // Verify q1 was unaffected by the above exception and stop it
+    assert(q1.isActive)
+    q1.stop()
+
+    // Verify another query can be started with name q1, but will have 
different id
+    val q2 = startQuery("q1")
+    assert(q2.name === "q1")
+    assert(q2.id !== q1.id)
+    q2.stop()
+  }
+
   testQuietly("lifecycle states and awaitTermination") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map { 6 / _}

http://git-wip-us.apache.org/repos/asf/spark/blob/c654ae21/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7f1e5fe..cbfa6ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       new AssertOnQuery(condition, message)
     }
 
-    def apply(message: String)(condition: StreamExecution => Boolean): 
AssertOnQuery = {
-      new AssertOnQuery(condition, message)
+    def apply(message: String)(condition: StreamExecution => Unit): 
AssertOnQuery = {
+      new AssertOnQuery(s => { condition; true }, message)
     }
   }
 
@@ -305,13 +305,13 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
               spark
                 .streams
                 .startQuery(
-                  StreamExecution.nextName,
-                  metadataRoot,
+                  None,
+                  Some(metadataRoot),
                   stream,
                   sink,
                   outputMode,
-                  trigger,
-                  triggerClock)
+                  trigger = trigger,
+                  triggerClock = triggerClock)
                 .asInstanceOf[StreamExecution]
             currentStream.microBatchThread.setUncaughtExceptionHandler(
               new UncaughtExceptionHandler {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to