Repository: spark Updated Branches: refs/heads/master b0ae6a38a -> 746a558de
[SPARK-19876][SS][WIP] OneTime Trigger Executor ## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. marmbrus tdas zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcon...@gmail.com> Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #17219 from tcondie/stream-commit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/746a558d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/746a558d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/746a558d Branch: refs/heads/master Commit: 746a558de2136f91f8fe77c6e51256017aa50913 Parents: b0ae6a3 Author: Tyson Condie <tcon...@gmail.com> Authored: Thu Mar 23 14:32:05 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Mar 23 14:32:05 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 - project/MimaExcludes.scala | 6 +- python/pyspark/sql/streaming.py | 63 +++----- python/pyspark/sql/tests.py | 17 ++- .../execution/streaming/BatchCommitLog.scala | 77 ++++++++++ .../execution/streaming/StreamExecution.scala | 81 ++++++++-- .../execution/streaming/TriggerExecutor.scala | 11 ++ .../sql/execution/streaming/Triggers.scala | 29 ++++ .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../spark/sql/streaming/ProcessingTime.scala | 150 +++++++++++++++++++ .../org/apache/spark/sql/streaming/Trigger.java | 105 +++++++++++++ .../apache/spark/sql/streaming/Trigger.scala | 150 ------------------- .../sql/streaming/EventTimeWatermarkSuite.scala | 4 +- .../streaming/FlatMapGroupsWithStateSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 20 ++- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../streaming/StreamingAggregationSuite.scala | 4 + .../streaming/StreamingQueryListenerSuite.scala | 18 ++- .../sql/streaming/StreamingQuerySuite.scala | 48 +++++- .../test/DataStreamReaderWriterSuite.scala | 5 +- 20 files changed, 571 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 7b6396e..6391d62 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest { StopStream, StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, - AdvanceManualClock(100), - waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index bd4528b..9925a8b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -64,7 +64,11 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"), // [SPARK-17161] Removing Python-friendly constructors not needed - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), + + // [SPARK-19876] Add one time trigger, and improve Trigger APIs + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime") ) // Exclude rules for 2.1.x http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 80f4340..27d6725 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -277,44 +277,6 @@ class StreamingQueryManager(object): self._jsqm.resetTerminated() -class Trigger(object): - """Used to indicate how often results should be produced by a :class:`StreamingQuery`. - - .. note:: Experimental - - .. versionadded:: 2.0 - """ - - __metaclass__ = ABCMeta - - @abstractmethod - def _to_java_trigger(self, sqlContext): - """Internal method to construct the trigger on the jvm. - """ - pass - - -class ProcessingTime(Trigger): - """A trigger that runs a query periodically based on the processing time. If `interval` is 0, - the query will run as fast as possible. - - The interval should be given as a string, e.g. '2 seconds', '5 minutes', ... - - .. note:: Experimental - - .. versionadded:: 2.0 - """ - - def __init__(self, interval): - if type(interval) != str or len(interval.strip()) == 0: - raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.") - self.interval = interval - - def _to_java_trigger(self, sqlContext): - return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create( - self.interval) - - class DataStreamReader(OptionUtils): """ Interface used to load a streaming :class:`DataFrame` from external storage systems @@ -790,7 +752,7 @@ class DataStreamWriter(object): @keyword_only @since(2.0) - def trigger(self, processingTime=None): + def trigger(self, processingTime=None, once=None): """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. @@ -800,17 +762,26 @@ class DataStreamWriter(object): >>> # trigger the query for execution every 5 seconds >>> writer = sdf.writeStream.trigger(processingTime='5 seconds') + >>> # trigger the query for just once batch of data + >>> writer = sdf.writeStream.trigger(once=True) """ - from pyspark.sql.streaming import ProcessingTime - trigger = None + jTrigger = None if processingTime is not None: + if once is not None: + raise ValueError('Multiple triggers not allowed.') if type(processingTime) != str or len(processingTime.strip()) == 0: - raise ValueError('The processing time must be a non empty string. Got: %s' % + raise ValueError('Value for processingTime must be a non empty string. Got: %s' % processingTime) - trigger = ProcessingTime(processingTime) - if trigger is None: - raise ValueError('A trigger was not provided. Supported triggers: processingTime.') - self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark)) + interval = processingTime.strip() + jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime( + interval) + elif once is not None: + if once is not True: + raise ValueError('Value for once must be True. Got: %s' % once) + jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() + else: + raise ValueError('No trigger provided') + self._jwrite = self._jwrite.trigger(jTrigger) return self @ignore_unicode_prefix http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 29d613b..b93b7ed 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1255,13 +1255,26 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) - def test_stream_trigger_takes_keyword_args(self): + def test_stream_trigger(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + + # Should take at least one arg + try: + df.writeStream.trigger() + except ValueError: + pass + + # Should not take multiple args + try: + df.writeStream.trigger(once=True, processingTime='5 seconds') + except ValueError: + pass + + # Should take only keyword args try: df.writeStream.trigger('5 seconds') self.fail("Should have thrown an exception") except TypeError: - # should throw error pass def test_stream_read_options(self): http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala new file mode 100644 index 0000000..fb1a4fb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.apache.spark.sql.SparkSession + +/** + * Used to write log files that represent batch commit points in structured streaming. + * A commit log file will be written immediately after the successful completion of a + * batch, and before processing the next batch. Here is an execution summary: + * - trigger batch 1 + * - obtain batch 1 offsets and write to offset log + * - process batch 1 + * - write batch 1 to completion log + * - trigger batch 2 + * - obtain bactch 2 offsets and write to offset log + * - process batch 2 + * - write batch 2 to completion log + * .... + * + * The current format of the batch completion log is: + * line 1: version + * line 2: metadata (optional json string) + */ +class BatchCommitLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[String](sparkSession, path) { + + override protected def deserialize(in: InputStream): String = { + // called inside a try-finally where the underlying stream is closed in the caller + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + parseVersion(lines.next().trim, BatchCommitLog.VERSION) + // read metadata + lines.next().trim match { + case BatchCommitLog.SERIALIZED_VOID => null + case metadata => metadata + } + } + + override protected def serialize(metadata: String, out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8)) + out.write('\n') + + // write metadata or void + out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata) + .getBytes(UTF_8)) + } +} + +object BatchCommitLog { + private val VERSION = 1 + private val SERIALIZED_VOID = "{}" +} + http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 60d5283..34e9262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,6 +165,8 @@ class StreamExecution( private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => OneTimeExecutor() + case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") } /** Defines the internal state of execution */ @@ -209,6 +211,13 @@ class StreamExecution( */ val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) + /** + * A log that records the batch ids that have completed. This is used to check if a batch was + * fully processed, and its output was committed to the sink, hence no need to process it again. + * This is used (for instance) during restart, to help identify which batch to run next. + */ + val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) + /** Whether all fields of the query have been initialized */ private def isInitialized: Boolean = state.get != INITIALIZING @@ -291,10 +300,13 @@ class StreamExecution( runBatch(sparkSessionToRunBatches) } } - // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) if (dataAvailable) { + // Update committed offsets. + committedOffsets ++= availableOffsets + batchCommitLog.add(currentBatchId, null) + logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 } else { @@ -306,9 +318,6 @@ class StreamExecution( } else { false } - - // Update committed offsets. - committedOffsets ++= availableOffsets updateStatusMessage("Waiting for next trigger") continueToRun }) @@ -392,13 +401,33 @@ class StreamExecution( * - currentBatchId * - committedOffsets * - availableOffsets + * The basic structure of this method is as follows: + * + * Identify (from the offset log) the offsets used to run the last batch + * IF last batch exists THEN + * Set the next batch to be executed as the last recovered batch + * Check the commit log to see which batch was committed last + * IF the last batch was committed THEN + * Call getBatch using the last batch start and end offsets + * // ^^^^ above line is needed since some sources assume last batch always re-executes + * Setup for a new batch i.e., start = last batch end, and identify new end + * DONE + * ELSE + * Identify a brand new batch + * DONE */ private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { offsetLog.getLatest() match { - case Some((batchId, nextOffsets)) => - logInfo(s"Resuming streaming query, starting with batch $batchId") - currentBatchId = batchId + case Some((latestBatchId, nextOffsets)) => + /* First assume that we are re-executing the latest known batch + * in the offset log */ + currentBatchId = latestBatchId availableOffsets = nextOffsets.toStreamProgress(sources) + /* Initialize committed offsets to a committed batch, which at this + * is the second latest batch id in the offset log. */ + offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId => + committedOffsets = secondLatestBatchId.toStreamProgress(sources) + } // update offset metadata nextOffsets.metadata.foreach { metadata => @@ -419,14 +448,37 @@ class StreamExecution( SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) } - logDebug(s"Found possibly unprocessed offsets $availableOffsets " + - s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") - - offsetLog.get(batchId - 1).foreach { - case lastOffsets => - committedOffsets = lastOffsets.toStreamProgress(sources) - logDebug(s"Resuming with committed offsets: $committedOffsets") + /* identify the current batch id: if commit log indicates we successfully processed the + * latest batch id in the offset log, then we can safely move to the next batch + * i.e., committedBatchId + 1 */ + batchCommitLog.getLatest() match { + case Some((latestCommittedBatchId, _)) => + if (latestBatchId == latestCommittedBatchId) { + /* The last batch was successfully committed, so we can safely process a + * new next batch but first: + * Make a call to getBatch using the offsets from previous batch. + * because certain sources (e.g., KafkaSource) assume on restart the last + * batch will be executed before getOffset is called again. */ + availableOffsets.foreach { ao: (Source, Offset) => + val (source, end) = ao + if (committedOffsets.get(source).map(_ != end).getOrElse(true)) { + val start = committedOffsets.get(source) + source.getBatch(start, end) + } + } + currentBatchId = latestCommittedBatchId + 1 + committedOffsets ++= availableOffsets + // Construct a new batch be recomputing availableOffsets + constructNextBatch() + } else if (latestCommittedBatchId < latestBatchId - 1) { + logWarning(s"Batch completion log latest batch id is " + + s"${latestCommittedBatchId}, which is not trailing " + + s"batchid $latestBatchId by one") + } + case None => logInfo("no commit log present") } + logDebug(s"Resuming at batch $currentBatchId with committed offsets " + + s"$committedOffsets and available offsets $availableOffsets") case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") currentBatchId = 0 @@ -523,6 +575,7 @@ class StreamExecution( // Note that purge is exclusive, i.e. it purges everything before the target ID. if (minBatchesToRetain < currentBatchId) { offsetLog.purge(currentBatchId - minBatchesToRetain) + batchCommitLog.purge(currentBatchId - minBatchesToRetain) } } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index ac510df..02996ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -30,6 +30,17 @@ trait TriggerExecutor { } /** + * A trigger executor that runs a single batch only, then terminates. + */ +case class OneTimeExecutor() extends TriggerExecutor { + + /** + * Execute a single batch using `batchRunner`. + */ + override def execute(batchRunner: () => Boolean): Unit = batchRunner() +} + +/** * A trigger executor that runs a batch every `intervalMs` milliseconds. */ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock()) http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala new file mode 100644 index 0000000..271bc4d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.streaming.Trigger + +/** + * A [[Trigger]] that process only one batch of data in a streaming query then terminates + * the query. + */ +@Experimental +@InterfaceStability.Evolving +case object OneTimeTrigger extends Trigger http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index fe52013..f2f7005 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private var outputMode: OutputMode = OutputMode.Append - private var trigger: Trigger = ProcessingTime(0L) + private var trigger: Trigger = Trigger.ProcessingTime(0L) private var extraOptions = new scala.collection.mutable.HashMap[String, String] http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala new file mode 100644 index 0000000..bdad8e4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.unsafe.types.CalendarInterval + +/** + * :: Experimental :: + * A trigger that runs a query periodically based on the processing time. If `interval` is 0, + * the query will run as fast as possible. + * + * Scala Example: + * {{{ + * df.writeStream.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") +case class ProcessingTime(intervalMs: Long) extends Trigger { + require(intervalMs >= 0, "the interval of trigger should not be negative") +} + +/** + * :: Experimental :: + * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0") +object ProcessingTime { + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * df.writeStream.trigger(ProcessingTime("10 seconds")) + * }}} + * + * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) + */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + def apply(interval: String): ProcessingTime = { + if (StringUtils.isBlank(interval)) { + throw new IllegalArgumentException( + "interval cannot be null or blank.") + } + val cal = if (interval.startsWith("interval")) { + CalendarInterval.fromString(interval) + } else { + CalendarInterval.fromString("interval " + interval) + } + if (cal == null) { + throw new IllegalArgumentException(s"Invalid interval: $interval") + } + if (cal.months > 0) { + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + } + new ProcessingTime(cal.microseconds / 1000) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * + * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) + */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + def apply(interval: Duration): ProcessingTime = { + new ProcessingTime(interval.toMillis) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * df.writeStream.trigger(ProcessingTime.create("10 seconds")) + * }}} + * + * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) + */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0") + def create(interval: String): ProcessingTime = { + apply(interval) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + * @deprecated use Trigger.ProcessingTimeTrigger(interval) + */ + @deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0") + def create(interval: Long, unit: TimeUnit): ProcessingTime = { + new ProcessingTime(unit.toMillis(interval)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 0000000..a03a851 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; + +/** + * :: Experimental :: + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 2.0.0 + */ +@Experimental +@InterfaceStability.Evolving +public class Trigger { + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTime.apply(intervalMs); + } + + /** + * :: Experimental :: + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.2.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTime.create(interval, timeUnit); + } + + /** + * :: Experimental :: + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTime.apply(interval); + } + + /** + * :: Experimental :: + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + * @since 2.2.0 + */ + public static Trigger ProcessingTime(String interval) { + return ProcessingTime.apply(interval); + } + + /** + * A trigger that process only one batch of data in a streaming query then terminates + * the query. + * + * @since 2.2.0 + */ + public static Trigger Once() { + return OneTimeTrigger$.MODULE$; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala deleted file mode 100644 index 68f2eab..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration - -import org.apache.commons.lang3.StringUtils - -import org.apache.spark.annotation.{Experimental, InterfaceStability} -import org.apache.spark.unsafe.types.CalendarInterval - -/** - * :: Experimental :: - * Used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 2.0.0 - */ -@Experimental -@InterfaceStability.Evolving -sealed trait Trigger - -/** - * :: Experimental :: - * A trigger that runs a query periodically based on the processing time. If `interval` is 0, - * the query will run as fast as possible. - * - * Scala Example: - * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) - * - * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) - * }}} - * - * Java Example: - * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) - * - * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - * - * @since 2.0.0 - */ -@Experimental -@InterfaceStability.Evolving -case class ProcessingTime(intervalMs: Long) extends Trigger { - require(intervalMs >= 0, "the interval of trigger should not be negative") -} - -/** - * :: Experimental :: - * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. - * - * @since 2.0.0 - */ -@Experimental -@InterfaceStability.Evolving -object ProcessingTime { - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) - * }}} - * - * @since 2.0.0 - */ - def apply(interval: String): ProcessingTime = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } - if (cal.months > 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") - } - new ProcessingTime(cal.microseconds / 1000) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) - * }}} - * - * @since 2.0.0 - */ - def apply(interval: Duration): ProcessingTime = { - new ProcessingTime(interval.toMillis) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) - * }}} - * - * @since 2.0.0 - */ - def create(interval: String): ProcessingTime = { - apply(interval) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - * - * @since 2.0.0 - */ - def create(interval: Long, unit: TimeUnit): ProcessingTime = { - new ProcessingTime(unit.toMillis(interval)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 7614ea5..fd850a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -218,7 +218,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin AddData(inputData, 25), // Evict items less than previous watermark. CheckLastBatch((10, 5)), StopStream, - AssertOnQuery { q => // clear the sink + AssertOnQuery { q => // purge commit and clear the sink + val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L + q.batchCommitLog.purge(commit) q.sink.asInstanceOf[MemorySink].clear() true }, http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 89a2597..a00a1a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -575,9 +575,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf StopStream, StartStream(ProcessingTime("1 second"), triggerClock = clock), + AdvanceManualClock(10 * 1000), AddData(inputData, "c"), - AdvanceManualClock(20 * 1000), + AdvanceManualClock(1 * 1000), CheckLastBatch(("b", "-1"), ("c", "1")), assertNumStateRows(total = 1, updated = 2), http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f01211e..32920f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -156,6 +156,15 @@ class StreamSuite extends StreamTest { AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId, s"offsetLog's latest should be $expectedId") + // Check the latest batchid in the commit log + def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId, + s"commitLog's latest should be $expectedId") + + // Ensure that there has not been an incremental execution after restart + def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery = + AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run") + // For each batch, we would log the state change during the execution // This checks whether the key of the state change log is the expected batch id def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery = @@ -181,6 +190,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 0 CheckAnswer(1, 2, 3), CheckIncrementalExecutionCurrentBatchId(0), + CheckCommitLogLatestBatchId(0), CheckOffsetLogLatestBatchId(0), CheckSinkLatestBatchId(0), // Add some data in batch 1 @@ -191,6 +201,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 1 CheckAnswer(1, 2, 3, 4, 5, 6), CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), @@ -203,6 +214,7 @@ class StreamSuite extends StreamTest { // the currentId does not get logged (e.g. as 2) even if the clock has advanced many times CheckAnswer(1, 2, 3, 4, 5, 6), CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), @@ -210,14 +222,15 @@ class StreamSuite extends StreamTest { StopStream, StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), - /* -- batch 1 rerun ----------------- */ - // this batch 1 would re-run because the latest batch id logged in offset log is 1 + /* -- batch 1 no rerun ----------------- */ + // batch 1 would not re-run because the latest batch id logged in commit log is 1 AdvanceManualClock(10 * 1000), + CheckNoIncrementalExecutionCurrentBatchId(), /* -- batch 2 ----------------------- */ // Check the results of batch 1 CheckAnswer(1, 2, 3, 4, 5, 6), - CheckIncrementalExecutionCurrentBatchId(1), + CheckCommitLogLatestBatchId(1), CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), // Add some data in batch 2 @@ -228,6 +241,7 @@ class StreamSuite extends StreamTest { // Check the results of batch 2 CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9), CheckIncrementalExecutionCurrentBatchId(2), + CheckCommitLogLatestBatchId(2), CheckOffsetLogLatestBatchId(2), CheckSinkLatestBatchId(2)) } http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 60e2375..8cf1791 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -159,7 +159,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Starts the stream, resuming if data has already been processed. It must not be running. */ case class StartStream( - trigger: Trigger = ProcessingTime(0), + trigger: Trigger = Trigger.ProcessingTime(0), triggerClock: Clock = new SystemClock, additionalConfs: Map[String, String] = Map.empty) extends StreamAction http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 0c80156..600c039 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -272,11 +272,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) // advance by a minute i.e., 90 seconds total clock.advance(60 * 1000L) true }, StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + // The commit log blown, causing the last batch to re-run CheckLastBatch((20L, 1), (85L, 1)), AssertOnQuery { q => clock.getTimeMillis() == 90000L @@ -322,11 +324,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) // advance by 60 days i.e., 90 days total clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true }, StartStream(ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch CheckLastBatch((20L, 1), (85L, 1)), // advance clock to 100 days, should retain keys >= 90 http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index eb09b9f..03dad8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -57,6 +57,20 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val inputData = new MemoryStream[Int](0, sqlContext) val df = inputData.toDS().as[Long].map { 10 / _ } val listener = new EventCollector + + case class AssertStreamExecThreadToWaitForClock() + extends AssertOnQuery(q => { + eventually(Timeout(streamingTimeout)) { + if (q.exception.isEmpty) { + assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis)) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + }, "") + try { // No events until started spark.streams.addListener(listener) @@ -81,6 +95,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Progress event generated when data processed AddData(inputData, 1, 2), AdvanceManualClock(100), + AssertStreamExecThreadToWaitForClock(), CheckAnswer(10, 5), AssertOnQuery { query => assert(listener.progressEvents.nonEmpty) @@ -109,8 +124,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Termination event generated with exception message when stopped with error StartStream(ProcessingTime(100), triggerClock = clock), + AssertStreamExecThreadToWaitForClock(), AddData(inputData, 0), - AdvanceManualClock(100), + AdvanceManualClock(100), // process bad data ExpectFailure[SparkException](), AssertOnQuery { query => eventually(Timeout(streamingTimeout)) { http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a0a2b2b..3f41ecd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -158,6 +158,49 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + testQuietly("OneTime trigger, commit log, and exception") { + import Trigger.Once + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map { 6 / _} + + testStream(mapped)( + AssertOnQuery(_.isActive === true), + StopStream, + AddData(inputData, 1, 2), + StartStream(trigger = Once), + CheckAnswer(6, 3), + StopStream, // clears out StreamTest state + AssertOnQuery { q => + // both commit log and offset log contain the same (latest) batch id + q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) == + q.offsetLog.getLatest().map(_._1).getOrElse(-2L) + }, + AssertOnQuery { q => + // blow away commit log and sink result + q.batchCommitLog.purge(1) + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(trigger = Once), + CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess batch + StopStream, + AddData(inputData, 3), + StartStream(trigger = Once), + CheckLastBatch(2), // commit log should be back in place + StopStream, + AddData(inputData, 0), + StartStream(trigger = Once), + ExpectFailure[SparkException](), + AssertOnQuery(_.isActive === false), + AssertOnQuery(q => { + q.exception.get.startOffset === + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && + q.exception.get.endOffset === + q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString + }, "incorrect start offset or end offset on exception") + ) + } + testQuietly("status, lastProgress, and recentProgress") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -237,6 +280,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AdvanceManualClock(500), // time = 1100 to unblock job AssertOnQuery { _ => clock.getTimeMillis() === 1100 }, CheckAnswer(2), + AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -275,6 +319,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AddData(inputData, 1, 2), AdvanceManualClock(100), // allow another trigger + AssertStreamExecThreadToWaitForClock(), CheckAnswer(4), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), @@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Test status and progress after query terminated with error StartStream(ProcessingTime(100), triggerClock = clock), + AdvanceManualClock(100), // ensure initial trigger completes before AddData AddData(inputData, 0), - AdvanceManualClock(100), + AdvanceManualClock(100), // allow another trigger ExpectFailure[SparkException](), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), http://git-wip-us.apache.org/repos/asf/spark/blob/746a558d/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 341ab0e..05cd3d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _} +import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -346,7 +347,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { q = df.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) + .trigger(ProcessingTime(100, TimeUnit.SECONDS)) .start() q.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org