Repository: spark
Updated Branches:
  refs/heads/master b0ae6a38a -> 746a558de


[SPARK-19876][SS][WIP] OneTime Trigger Executor

## What changes were proposed in this pull request?

An additional trigger and trigger executor that will execute a single trigger 
only. One can use this OneTime trigger to have more control over the scheduling 
of triggers.

In addition, this patch requires an optimization to StreamExecution that logs a 
commit record at the end of successfully processing a batch. This new commit 
log will be used to determine the next batch (offsets) to process after a 
restart, instead of using the offset log itself to determine what batch to 
process next after restart; using the offset log to determine this would 
process the previously logged batch, always, thus not permitting a OneTime 
trigger feature.

## How was this patch tested?

A number of existing tests have been revised. These tests all assumed that when 
restarting a stream, the last batch in the offset log is to be re-processed. 
Given that we now have a commit log that will tell us if that last batch was 
processed successfully, the results/assumptions of those tests needed to be 
revised accordingly.

In addition, a OneTime trigger test was added to StreamingQuerySuite, which 
tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, 
then stop).
- The case when the commit log was not able to successfully log the completion 
of a batch before restart, which would mean that we should fall back to what's 
in the offset log.
- A OneTime trigger execution that results in an exception being thrown.

marmbrus tdas zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Tyson Condie <tcon...@gmail.com>
Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17219 from tcondie/stream-commit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/746a558d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/746a558d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/746a558d

Branch: refs/heads/master
Commit: 746a558de2136f91f8fe77c6e51256017aa50913
Parents: b0ae6a3
Author: Tyson Condie <tcon...@gmail.com>
Authored: Thu Mar 23 14:32:05 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Mar 23 14:32:05 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   2 -
 project/MimaExcludes.scala                      |   6 +-
 python/pyspark/sql/streaming.py                 |  63 +++-----
 python/pyspark/sql/tests.py                     |  17 ++-
 .../execution/streaming/BatchCommitLog.scala    |  77 ++++++++++
 .../execution/streaming/StreamExecution.scala   |  81 ++++++++--
 .../execution/streaming/TriggerExecutor.scala   |  11 ++
 .../sql/execution/streaming/Triggers.scala      |  29 ++++
 .../spark/sql/streaming/DataStreamWriter.scala  |   2 +-
 .../spark/sql/streaming/ProcessingTime.scala    | 150 +++++++++++++++++++
 .../org/apache/spark/sql/streaming/Trigger.java | 105 +++++++++++++
 .../apache/spark/sql/streaming/Trigger.scala    | 150 -------------------
 .../sql/streaming/EventTimeWatermarkSuite.scala |   4 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala |   3 +-
 .../spark/sql/streaming/StreamSuite.scala       |  20 ++-
 .../apache/spark/sql/streaming/StreamTest.scala |   2 +-
 .../streaming/StreamingAggregationSuite.scala   |   4 +
 .../streaming/StreamingQueryListenerSuite.scala |  18 ++-
 .../sql/streaming/StreamingQuerySuite.scala     |  48 +++++-
 .../test/DataStreamReaderWriterSuite.scala      |   5 +-
 20 files changed, 571 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 7b6396e..6391d62 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
       StopStream,
       StartStream(ProcessingTime(100), clock),
       waitUntilBatchProcessed,
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
       // smallest now empty, 1 more from middle, 9 more from biggest
       CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
         11, 108, 109, 110, 111, 112, 113, 114, 115, 116,

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bd4528b..9925a8b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -64,7 +64,11 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"),
 
     // [SPARK-17161] Removing Python-friendly constructors not needed
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),
+
+    // [SPARK-19876] Add one time trigger, and improve Trigger APIs
+    
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"),
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime")
   )
 
   // Exclude rules for 2.1.x

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 80f4340..27d6725 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -277,44 +277,6 @@ class StreamingQueryManager(object):
         self._jsqm.resetTerminated()
 
 
-class Trigger(object):
-    """Used to indicate how often results should be produced by a 
:class:`StreamingQuery`.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.0
-    """
-
-    __metaclass__ = ABCMeta
-
-    @abstractmethod
-    def _to_java_trigger(self, sqlContext):
-        """Internal method to construct the trigger on the jvm.
-        """
-        pass
-
-
-class ProcessingTime(Trigger):
-    """A trigger that runs a query periodically based on the processing time. 
If `interval` is 0,
-    the query will run as fast as possible.
-
-    The interval should be given as a string, e.g. '2 seconds', '5 minutes', 
...
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.0
-    """
-
-    def __init__(self, interval):
-        if type(interval) != str or len(interval.strip()) == 0:
-            raise ValueError("interval should be a non empty interval string, 
e.g. '2 seconds'.")
-        self.interval = interval
-
-    def _to_java_trigger(self, sqlContext):
-        return 
sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
-            self.interval)
-
-
 class DataStreamReader(OptionUtils):
     """
     Interface used to load a streaming :class:`DataFrame` from external 
storage systems
@@ -790,7 +752,7 @@ class DataStreamWriter(object):
 
     @keyword_only
     @since(2.0)
-    def trigger(self, processingTime=None):
+    def trigger(self, processingTime=None, once=None):
         """Set the trigger for the stream query. If this is not set it will 
run the query as fast
         as possible, which is equivalent to setting the trigger to 
``processingTime='0 seconds'``.
 
@@ -800,17 +762,26 @@ class DataStreamWriter(object):
 
         >>> # trigger the query for execution every 5 seconds
         >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+        >>> # trigger the query for just once batch of data
+        >>> writer = sdf.writeStream.trigger(once=True)
         """
-        from pyspark.sql.streaming import ProcessingTime
-        trigger = None
+        jTrigger = None
         if processingTime is not None:
+            if once is not None:
+                raise ValueError('Multiple triggers not allowed.')
             if type(processingTime) != str or len(processingTime.strip()) == 0:
-                raise ValueError('The processing time must be a non empty 
string. Got: %s' %
+                raise ValueError('Value for processingTime must be a non empty 
string. Got: %s' %
                                  processingTime)
-            trigger = ProcessingTime(processingTime)
-        if trigger is None:
-            raise ValueError('A trigger was not provided. Supported triggers: 
processingTime.')
-        self._jwrite = 
self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+            interval = processingTime.strip()
+            jTrigger = 
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
+                interval)
+        elif once is not None:
+            if once is not True:
+                raise ValueError('Value for once must be True. Got: %s' % once)
+            jTrigger = 
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
+        else:
+            raise ValueError('No trigger provided')
+        self._jwrite = self._jwrite.trigger(jTrigger)
         return self
 
     @ignore_unicode_prefix

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 29d613b..b93b7ed 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1255,13 +1255,26 @@ class SQLTests(ReusedPySparkTestCase):
 
         shutil.rmtree(tmpPath)
 
-    def test_stream_trigger_takes_keyword_args(self):
+    def test_stream_trigger(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+
+        # Should take at least one arg
+        try:
+            df.writeStream.trigger()
+        except ValueError:
+            pass
+
+        # Should not take multiple args
+        try:
+            df.writeStream.trigger(once=True, processingTime='5 seconds')
+        except ValueError:
+            pass
+
+        # Should take only keyword args
         try:
             df.writeStream.trigger('5 seconds')
             self.fail("Should have thrown an exception")
         except TypeError:
-            # should throw error
             pass
 
     def test_stream_read_options(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
new file mode 100644
index 0000000..fb1a4fb
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Used to write log files that represent batch commit points in structured 
streaming.
+ * A commit log file will be written immediately after the successful 
completion of a
+ * batch, and before processing the next batch. Here is an execution summary:
+ * - trigger batch 1
+ * - obtain batch 1 offsets and write to offset log
+ * - process batch 1
+ * - write batch 1 to completion log
+ * - trigger batch 2
+ * - obtain bactch 2 offsets and write to offset log
+ * - process batch 2
+ * - write batch 2 to completion log
+ * ....
+ *
+ * The current format of the batch completion log is:
+ * line 1: version
+ * line 2: metadata (optional json string)
+ */
+class BatchCommitLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[String](sparkSession, path) {
+
+  override protected def deserialize(in: InputStream): String = {
+    // called inside a try-finally where the underlying stream is closed in 
the caller
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+    parseVersion(lines.next().trim, BatchCommitLog.VERSION)
+    // read metadata
+    lines.next().trim match {
+      case BatchCommitLog.SERIALIZED_VOID => null
+      case metadata => metadata
+    }
+  }
+
+  override protected def serialize(metadata: String, out: OutputStream): Unit 
= {
+    // called inside a try-finally where the underlying stream is closed in 
the caller
+    out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
+    out.write('\n')
+
+    // write metadata or void
+    out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else 
metadata)
+      .getBytes(UTF_8))
+  }
+}
+
+object BatchCommitLog {
+  private val VERSION = 1
+  private val SERIALIZED_VOID = "{}"
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 60d5283..34e9262 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -165,6 +165,8 @@ class StreamExecution(
 
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+    case OneTimeTrigger => OneTimeExecutor()
+    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
   }
 
   /** Defines the internal state of execution */
@@ -209,6 +211,13 @@ class StreamExecution(
    */
   val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
 
+  /**
+   * A log that records the batch ids that have completed. This is used to 
check if a batch was
+   * fully processed, and its output was committed to the sink, hence no need 
to process it again.
+   * This is used (for instance) during restart, to help identify which batch 
to run next.
+   */
+  val batchCommitLog = new BatchCommitLog(sparkSession, 
checkpointFile("commits"))
+
   /** Whether all fields of the query have been initialized */
   private def isInitialized: Boolean = state.get != INITIALIZING
 
@@ -291,10 +300,13 @@ class StreamExecution(
                   runBatch(sparkSessionToRunBatches)
                 }
               }
-
               // Report trigger as finished and construct progress object.
               finishTrigger(dataAvailable)
               if (dataAvailable) {
+                // Update committed offsets.
+                committedOffsets ++= availableOffsets
+                batchCommitLog.add(currentBatchId, null)
+                logDebug(s"batch ${currentBatchId} committed")
                 // We'll increase currentBatchId after we complete processing 
current batch's data
                 currentBatchId += 1
               } else {
@@ -306,9 +318,6 @@ class StreamExecution(
             } else {
               false
             }
-
-          // Update committed offsets.
-          committedOffsets ++= availableOffsets
           updateStatusMessage("Waiting for next trigger")
           continueToRun
         })
@@ -392,13 +401,33 @@ class StreamExecution(
    *  - currentBatchId
    *  - committedOffsets
    *  - availableOffsets
+   *  The basic structure of this method is as follows:
+   *
+   *  Identify (from the offset log) the offsets used to run the last batch
+   *  IF last batch exists THEN
+   *    Set the next batch to be executed as the last recovered batch
+   *    Check the commit log to see which batch was committed last
+   *    IF the last batch was committed THEN
+   *      Call getBatch using the last batch start and end offsets
+   *      // ^^^^ above line is needed since some sources assume last batch 
always re-executes
+   *      Setup for a new batch i.e., start = last batch end, and identify new 
end
+   *    DONE
+   *  ELSE
+   *    Identify a brand new batch
+   *  DONE
    */
   private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): 
Unit = {
     offsetLog.getLatest() match {
-      case Some((batchId, nextOffsets)) =>
-        logInfo(s"Resuming streaming query, starting with batch $batchId")
-        currentBatchId = batchId
+      case Some((latestBatchId, nextOffsets)) =>
+        /* First assume that we are re-executing the latest known batch
+         * in the offset log */
+        currentBatchId = latestBatchId
         availableOffsets = nextOffsets.toStreamProgress(sources)
+        /* Initialize committed offsets to a committed batch, which at this
+         * is the second latest batch id in the offset log. */
+        offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
+          committedOffsets = secondLatestBatchId.toStreamProgress(sources)
+        }
 
         // update offset metadata
         nextOffsets.metadata.foreach { metadata =>
@@ -419,14 +448,37 @@ class StreamExecution(
             SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
         }
 
-        logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
-          s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
-
-        offsetLog.get(batchId - 1).foreach {
-          case lastOffsets =>
-            committedOffsets = lastOffsets.toStreamProgress(sources)
-            logDebug(s"Resuming with committed offsets: $committedOffsets")
+        /* identify the current batch id: if commit log indicates we 
successfully processed the
+         * latest batch id in the offset log, then we can safely move to the 
next batch
+         * i.e., committedBatchId + 1 */
+        batchCommitLog.getLatest() match {
+          case Some((latestCommittedBatchId, _)) =>
+            if (latestBatchId == latestCommittedBatchId) {
+              /* The last batch was successfully committed, so we can safely 
process a
+               * new next batch but first:
+               * Make a call to getBatch using the offsets from previous batch.
+               * because certain sources (e.g., KafkaSource) assume on restart 
the last
+               * batch will be executed before getOffset is called again. */
+              availableOffsets.foreach { ao: (Source, Offset) =>
+                val (source, end) = ao
+                if (committedOffsets.get(source).map(_ != 
end).getOrElse(true)) {
+                  val start = committedOffsets.get(source)
+                  source.getBatch(start, end)
+                }
+              }
+              currentBatchId = latestCommittedBatchId + 1
+              committedOffsets ++= availableOffsets
+              // Construct a new batch be recomputing availableOffsets
+              constructNextBatch()
+            } else if (latestCommittedBatchId < latestBatchId - 1) {
+              logWarning(s"Batch completion log latest batch id is " +
+                s"${latestCommittedBatchId}, which is not trailing " +
+                s"batchid $latestBatchId by one")
+            }
+          case None => logInfo("no commit log present")
         }
+        logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+          s"$committedOffsets and available offsets $availableOffsets")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
@@ -523,6 +575,7 @@ class StreamExecution(
         // Note that purge is exclusive, i.e. it purges everything before the 
target ID.
         if (minBatchesToRetain < currentBatchId) {
           offsetLog.purge(currentBatchId - minBatchesToRetain)
+          batchCommitLog.purge(currentBatchId - minBatchesToRetain)
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index ac510df..02996ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -30,6 +30,17 @@ trait TriggerExecutor {
 }
 
 /**
+ * A trigger executor that runs a single batch only, then terminates.
+ */
+case class OneTimeExecutor() extends TriggerExecutor {
+
+  /**
+   * Execute a single batch using `batchRunner`.
+   */
+  override def execute(batchRunner: () => Boolean): Unit = batchRunner()
+}
+
+/**
  * A trigger executor that runs a batch every `intervalMs` milliseconds.
  */
 case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock 
= new SystemClock())

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
new file mode 100644
index 0000000..271bc4d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.streaming.Trigger
+
+/**
+ * A [[Trigger]] that process only one batch of data in a streaming query then 
terminates
+ * the query.
+ */
+@Experimental
+@InterfaceStability.Evolving
+case object OneTimeTrigger extends Trigger

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index fe52013..f2f7005 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private var outputMode: OutputMode = OutputMode.Append
 
-  private var trigger: Trigger = ProcessingTime(0L)
+  private var trigger: Trigger = Trigger.ProcessingTime(0L)
 
   private var extraOptions = new scala.collection.mutable.HashMap[String, 
String]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
new file mode 100644
index 0000000..bdad8e4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * :: Experimental ::
+ * A trigger that runs a query periodically based on the processing time. If 
`interval` is 0,
+ * the query will run as fast as possible.
+ *
+ * Scala Example:
+ * {{{
+ *   df.writeStream.trigger(ProcessingTime("10 seconds"))
+ *
+ *   import scala.concurrent.duration._
+ *   df.writeStream.trigger(ProcessingTime(10.seconds))
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ *   df.writeStream.trigger(ProcessingTime.create("10 seconds"))
+ *
+ *   import java.util.concurrent.TimeUnit
+ *   df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
+case class ProcessingTime(intervalMs: Long) extends Trigger {
+  require(intervalMs >= 0, "the interval of trigger should not be negative")
+}
+
+/**
+ * :: Experimental ::
+ * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
+object ProcessingTime {
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   df.writeStream.trigger(ProcessingTime("10 seconds"))
+   * }}}
+   *
+   * @since 2.0.0
+   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   */
+  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  def apply(interval: String): ProcessingTime = {
+    if (StringUtils.isBlank(interval)) {
+      throw new IllegalArgumentException(
+        "interval cannot be null or blank.")
+    }
+    val cal = if (interval.startsWith("interval")) {
+      CalendarInterval.fromString(interval)
+    } else {
+      CalendarInterval.fromString("interval " + interval)
+    }
+    if (cal == null) {
+      throw new IllegalArgumentException(s"Invalid interval: $interval")
+    }
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    new ProcessingTime(cal.microseconds / 1000)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   import scala.concurrent.duration._
+   *   df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * @since 2.0.0
+   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   */
+  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  def apply(interval: Duration): ProcessingTime = {
+    new ProcessingTime(interval.toMillis)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   df.writeStream.trigger(ProcessingTime.create("10 seconds"))
+   * }}}
+   *
+   * @since 2.0.0
+   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   */
+  @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
+  def create(interval: String): ProcessingTime = {
+    apply(interval)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   import java.util.concurrent.TimeUnit
+   *   df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   * @deprecated use Trigger.ProcessingTimeTrigger(interval)
+   */
+  @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0")
+  def create(interval: Long, unit: TimeUnit): ProcessingTime = {
+    new ProcessingTime(unit.toMillis(interval))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 0000000..a03a851
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+
+/**
+ * :: Experimental ::
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class Trigger {
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+      return ProcessingTime.apply(intervalMs);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import java.util.concurrent.TimeUnit
+   *    df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+      return ProcessingTime.create(interval, timeUnit);
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+      return ProcessingTime.apply(interval);
+  }
+
+  /**
+   * :: Experimental ::
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * @since 2.2.0
+   */
+  public static Trigger ProcessingTime(String interval) {
+      return ProcessingTime.apply(interval);
+  }
+
+  /**
+   * A trigger that process only one batch of data in a streaming query then 
terminates
+   * the query.
+   *
+   * @since 2.2.0
+   */
+  public static Trigger Once() {
+    return OneTimeTrigger$.MODULE$;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
deleted file mode 100644
index 68f2eab..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration.Duration
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.annotation.{Experimental, InterfaceStability}
-import org.apache.spark.unsafe.types.CalendarInterval
-
-/**
- * :: Experimental ::
- * Used to indicate how often results should be produced by a 
[[StreamingQuery]].
- *
- * @since 2.0.0
- */
-@Experimental
-@InterfaceStability.Evolving
-sealed trait Trigger
-
-/**
- * :: Experimental ::
- * A trigger that runs a query periodically based on the processing time. If 
`interval` is 0,
- * the query will run as fast as possible.
- *
- * Scala Example:
- * {{{
- *   df.write.trigger(ProcessingTime("10 seconds"))
- *
- *   import scala.concurrent.duration._
- *   df.write.trigger(ProcessingTime(10.seconds))
- * }}}
- *
- * Java Example:
- * {{{
- *   df.write.trigger(ProcessingTime.create("10 seconds"))
- *
- *   import java.util.concurrent.TimeUnit
- *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
- * }}}
- *
- * @since 2.0.0
- */
-@Experimental
-@InterfaceStability.Evolving
-case class ProcessingTime(intervalMs: Long) extends Trigger {
-  require(intervalMs >= 0, "the interval of trigger should not be negative")
-}
-
-/**
- * :: Experimental ::
- * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s.
- *
- * @since 2.0.0
- */
-@Experimental
-@InterfaceStability.Evolving
-object ProcessingTime {
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.write.trigger(ProcessingTime("10 seconds"))
-   * }}}
-   *
-   * @since 2.0.0
-   */
-  def apply(interval: String): ProcessingTime = {
-    if (StringUtils.isBlank(interval)) {
-      throw new IllegalArgumentException(
-        "interval cannot be null or blank.")
-    }
-    val cal = if (interval.startsWith("interval")) {
-      CalendarInterval.fromString(interval)
-    } else {
-      CalendarInterval.fromString("interval " + interval)
-    }
-    if (cal == null) {
-      throw new IllegalArgumentException(s"Invalid interval: $interval")
-    }
-    if (cal.months > 0) {
-      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
-    }
-    new ProcessingTime(cal.microseconds / 1000)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import scala.concurrent.duration._
-   *   df.write.trigger(ProcessingTime(10.seconds))
-   * }}}
-   *
-   * @since 2.0.0
-   */
-  def apply(interval: Duration): ProcessingTime = {
-    new ProcessingTime(interval.toMillis)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.write.trigger(ProcessingTime.create("10 seconds"))
-   * }}}
-   *
-   * @since 2.0.0
-   */
-  def create(interval: String): ProcessingTime = {
-    apply(interval)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import java.util.concurrent.TimeUnit
-   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 2.0.0
-   */
-  def create(interval: Long, unit: TimeUnit): ProcessingTime = {
-    new ProcessingTime(unit.toMillis(interval))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 7614ea5..fd850a7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -218,7 +218,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Loggin
       AddData(inputData, 25), // Evict items less than previous watermark.
       CheckLastBatch((10, 5)),
       StopStream,
-      AssertOnQuery { q => // clear the sink
+      AssertOnQuery { q => // purge commit and clear the sink
+        val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
+        q.batchCommitLog.purge(commit)
         q.sink.asInstanceOf[MemorySink].clear()
         true
       },

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 89a2597..a00a1a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -575,9 +575,10 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 
       StopStream,
       StartStream(ProcessingTime("1 second"), triggerClock = clock),
+      AdvanceManualClock(10 * 1000),
 
       AddData(inputData, "c"),
-      AdvanceManualClock(20 * 1000),
+      AdvanceManualClock(1 * 1000),
       CheckLastBatch(("b", "-1"), ("c", "1")),
       assertNumStateRows(total = 1, updated = 2),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f01211e..32920f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -156,6 +156,15 @@ class StreamSuite extends StreamTest {
       AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
         s"offsetLog's latest should be $expectedId")
 
+    // Check the latest batchid in the commit log
+    def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
+      AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
+        s"commitLog's latest should be $expectedId")
+
+    // Ensure that there has not been an incremental execution after restart
+    def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery =
+      AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to 
run")
+
     // For each batch, we would log the state change during the execution
     // This checks whether the key of the state change log is the expected 
batch id
     def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): 
AssertOnQuery =
@@ -181,6 +190,7 @@ class StreamSuite extends StreamTest {
       // Check the results of batch 0
       CheckAnswer(1, 2, 3),
       CheckIncrementalExecutionCurrentBatchId(0),
+      CheckCommitLogLatestBatchId(0),
       CheckOffsetLogLatestBatchId(0),
       CheckSinkLatestBatchId(0),
       // Add some data in batch 1
@@ -191,6 +201,7 @@ class StreamSuite extends StreamTest {
       // Check the results of batch 1
       CheckAnswer(1, 2, 3, 4, 5, 6),
       CheckIncrementalExecutionCurrentBatchId(1),
+      CheckCommitLogLatestBatchId(1),
       CheckOffsetLogLatestBatchId(1),
       CheckSinkLatestBatchId(1),
 
@@ -203,6 +214,7 @@ class StreamSuite extends StreamTest {
       // the currentId does not get logged (e.g. as 2) even if the clock has 
advanced many times
       CheckAnswer(1, 2, 3, 4, 5, 6),
       CheckIncrementalExecutionCurrentBatchId(1),
+      CheckCommitLogLatestBatchId(1),
       CheckOffsetLogLatestBatchId(1),
       CheckSinkLatestBatchId(1),
 
@@ -210,14 +222,15 @@ class StreamSuite extends StreamTest {
       StopStream,
       StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 
1000)),
 
-      /* -- batch 1 rerun ----------------- */
-      // this batch 1 would re-run because the latest batch id logged in 
offset log is 1
+      /* -- batch 1 no rerun ----------------- */
+      // batch 1 would not re-run because the latest batch id logged in commit 
log is 1
       AdvanceManualClock(10 * 1000),
+      CheckNoIncrementalExecutionCurrentBatchId(),
 
       /* -- batch 2 ----------------------- */
       // Check the results of batch 1
       CheckAnswer(1, 2, 3, 4, 5, 6),
-      CheckIncrementalExecutionCurrentBatchId(1),
+      CheckCommitLogLatestBatchId(1),
       CheckOffsetLogLatestBatchId(1),
       CheckSinkLatestBatchId(1),
       // Add some data in batch 2
@@ -228,6 +241,7 @@ class StreamSuite extends StreamTest {
       // Check the results of batch 2
       CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
       CheckIncrementalExecutionCurrentBatchId(2),
+      CheckCommitLogLatestBatchId(2),
       CheckOffsetLogLatestBatchId(2),
       CheckSinkLatestBatchId(2))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 60e2375..8cf1791 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -159,7 +159,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 
   /** Starts the stream, resuming if data has already been processed. It must 
not be running. */
   case class StartStream(
-      trigger: Trigger = ProcessingTime(0),
+      trigger: Trigger = Trigger.ProcessingTime(0),
       triggerClock: Clock = new SystemClock,
       additionalConfs: Map[String, String] = Map.empty)
     extends StreamAction

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 0c80156..600c039 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -272,11 +272,13 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
       StopStream,
       AssertOnQuery { q => // clear the sink
         q.sink.asInstanceOf[MemorySink].clear()
+        q.batchCommitLog.purge(3)
         // advance by a minute i.e., 90 seconds total
         clock.advance(60 * 1000L)
         true
       },
       StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+      // The commit log blown, causing the last batch to re-run
       CheckLastBatch((20L, 1), (85L, 1)),
       AssertOnQuery { q =>
         clock.getTimeMillis() == 90000L
@@ -322,11 +324,13 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
       StopStream,
       AssertOnQuery { q => // clear the sink
         q.sink.asInstanceOf[MemorySink].clear()
+        q.batchCommitLog.purge(3)
         // advance by 60 days i.e., 90 days total
         clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
         true
       },
       StartStream(ProcessingTime("10 day"), triggerClock = clock),
+      // Commit log blown, causing a re-run of the last batch
       CheckLastBatch((20L, 1), (85L, 1)),
 
       // advance clock to 100 days, should retain keys >= 90

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index eb09b9f..03dad8a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -57,6 +57,20 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     val inputData = new MemoryStream[Int](0, sqlContext)
     val df = inputData.toDS().as[Long].map { 10 / _ }
     val listener = new EventCollector
+
+    case class AssertStreamExecThreadToWaitForClock()
+      extends AssertOnQuery(q => {
+        eventually(Timeout(streamingTimeout)) {
+          if (q.exception.isEmpty) {
+            
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+          }
+        }
+        if (q.exception.isDefined) {
+          throw q.exception.get
+        }
+        true
+      }, "")
+
     try {
       // No events until started
       spark.streams.addListener(listener)
@@ -81,6 +95,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         // Progress event generated when data processed
         AddData(inputData, 1, 2),
         AdvanceManualClock(100),
+        AssertStreamExecThreadToWaitForClock(),
         CheckAnswer(10, 5),
         AssertOnQuery { query =>
           assert(listener.progressEvents.nonEmpty)
@@ -109,8 +124,9 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
         // Termination event generated with exception message when stopped 
with error
         StartStream(ProcessingTime(100), triggerClock = clock),
+        AssertStreamExecThreadToWaitForClock(),
         AddData(inputData, 0),
-        AdvanceManualClock(100),
+        AdvanceManualClock(100), // process bad data
         ExpectFailure[SparkException](),
         AssertOnQuery { query =>
           eventually(Timeout(streamingTimeout)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index a0a2b2b..3f41ecd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -158,6 +158,49 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  testQuietly("OneTime trigger, commit log, and exception") {
+    import Trigger.Once
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map { 6 / _}
+
+    testStream(mapped)(
+      AssertOnQuery(_.isActive === true),
+      StopStream,
+      AddData(inputData, 1, 2),
+      StartStream(trigger = Once),
+      CheckAnswer(6, 3),
+      StopStream, // clears out StreamTest state
+      AssertOnQuery { q =>
+        // both commit log and offset log contain the same (latest) batch id
+        q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
+          q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
+      },
+      AssertOnQuery { q =>
+        // blow away commit log and sink result
+        q.batchCommitLog.purge(1)
+        q.sink.asInstanceOf[MemorySink].clear()
+        true
+      },
+      StartStream(trigger = Once),
+      CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess 
batch
+      StopStream,
+      AddData(inputData, 3),
+      StartStream(trigger = Once),
+      CheckLastBatch(2), // commit log should be back in place
+      StopStream,
+      AddData(inputData, 0),
+      StartStream(trigger = Once),
+      ExpectFailure[SparkException](),
+      AssertOnQuery(_.isActive === false),
+      AssertOnQuery(q => {
+        q.exception.get.startOffset ===
+          q.committedOffsets.toOffsetSeq(Seq(inputData), 
OffsetSeqMetadata()).toString &&
+          q.exception.get.endOffset ===
+            q.availableOffsets.toOffsetSeq(Seq(inputData), 
OffsetSeqMetadata()).toString
+      }, "incorrect start offset or end offset on exception")
+    )
+  }
+
   testQuietly("status, lastProgress, and recentProgress") {
     import StreamingQuerySuite._
     clock = new StreamManualClock
@@ -237,6 +280,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       AdvanceManualClock(500), // time = 1100 to unblock job
       AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
       CheckAnswer(2),
+      AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
@@ -275,6 +319,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
       AddData(inputData, 1, 2),
       AdvanceManualClock(100), // allow another trigger
+      AssertStreamExecThreadToWaitForClock(),
       CheckAnswer(4),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === false),
@@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
       // Test status and progress after query terminated with error
       StartStream(ProcessingTime(100), triggerClock = clock),
+      AdvanceManualClock(100), // ensure initial trigger completes before 
AddData
       AddData(inputData, 0),
-      AdvanceManualClock(100),
+      AdvanceManualClock(100), // allow another trigger
       ExpectFailure[SparkException](),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),

http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 341ab0e..05cd3d9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.{ProcessingTime => 
DeprecatedProcessingTime, _}
+import org.apache.spark.sql.streaming.Trigger._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -346,7 +347,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     q = df.writeStream
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
-      .trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
+      .trigger(ProcessingTime(100, TimeUnit.SECONDS))
       .start()
     q.stop()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to