Repository: spark Updated Branches: refs/heads/branch-2.0 9406a3c9a -> a780848af
http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala deleted file mode 100644 index ba1facf..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.util - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.ContinuousQuery -import org.apache.spark.sql.util.ContinuousQueryListener._ - -/** - * :: Experimental :: - * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]]. - * @note The methods are not thread-safe as they may be called from different threads. - */ -@Experimental -abstract class ContinuousQueryListener { - - /** - * Called when a query is started. - * @note This is called synchronously with - * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], - * that is, `onQueryStart` will be called on all listeners before - * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please - * don't block this method as it will block your query. - */ - def onQueryStarted(queryStarted: QueryStarted): Unit - - /** - * Called when there is some status update (ingestion rate updated, etc.) - * - * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be - * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]] - * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]] - * is terminated when you are processing [[QueryProgress]]. - */ - def onQueryProgress(queryProgress: QueryProgress): Unit - - /** Called when a query is stopped, with or without error */ - def onQueryTerminated(queryTerminated: QueryTerminated): Unit -} - - -/** - * :: Experimental :: - * Companion object of [[ContinuousQueryListener]] that defines the listener events. - */ -@Experimental -object ContinuousQueryListener { - - /** Base type of [[ContinuousQueryListener]] events */ - trait Event - - /** Event representing the start of a query */ - class QueryStarted private[sql](val query: ContinuousQuery) extends Event - - /** Event representing any progress updates in a query */ - class QueryProgress private[sql](val query: ContinuousQuery) extends Event - - /** Event representing that termination of a query */ - class QueryTerminated private[sql](val query: ContinuousQuery) extends Event -} http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala index 0d18a64..52c2007 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.ProcessingTime class ProcessingTimeSuite extends SparkFunSuite { http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala deleted file mode 100644 index b033725..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.lang.Thread.UncaughtExceptionHandler - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.language.experimental.macros -import scala.reflect.ClassTag -import scala.util.Random -import scala.util.control.NonFatal - -import org.scalatest.Assertions -import org.scalatest.concurrent.{Eventually, Timeouts} -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.exceptions.TestFailedDueToTimeoutException -import org.scalatest.time.Span -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} - -/** - * A framework for implementing tests for streaming queries and sources. - * - * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order, - * blocking as necessary to let the stream catch up. For example, the following adds some data to - * a stream, blocking until it can verify that the correct values are eventually produced. - * - * {{{ - * val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(_ + 1) - - testStream(mapped)( - AddData(inputData, 1, 2, 3), - CheckAnswer(2, 3, 4)) - * }}} - * - * Note that while we do sleep to allow the other thread to progress without spinning, - * `StreamAction` checks should not depend on the amount of time spent sleeping. Instead they - * should check the actual progress of the stream before verifying the required test condition. - * - * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to - * avoid hanging forever in the case of failures. However, individual suites can change this - * by overriding `streamingTimeout`. - */ -trait StreamTest extends QueryTest with Timeouts { - - /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds - - /** A trait for actions that can be performed while testing a streaming DataFrame. */ - trait StreamAction - - /** A trait to mark actions that require the stream to be actively running. */ - trait StreamMustBeRunning - - /** - * Adds the given data to the stream. Subsequent check answers will block until this data has - * been processed. - */ - object AddData { - def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] = - AddDataMemory(source, data) - } - - /** A trait that can be extended when testing a source. */ - trait AddData extends StreamAction { - /** - * Called to adding the data to a source. It should find the source to add data to from - * the active query, and then return the source object the data was added, as well as the - * offset of added data. - */ - def addData(query: Option[StreamExecution]): (Source, Offset) - } - - case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { - override def toString: String = s"AddData to $source: ${data.mkString(",")}" - - override def addData(query: Option[StreamExecution]): (Source, Offset) = { - (source, source.addData(data)) - } - } - - /** - * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`. - * This operation automatically blocks until all added data has been processed. - */ - object CheckAnswer { - def apply[A : Encoder](data: A*): CheckAnswerRows = { - val encoder = encoderFor[A] - val toExternalRow = RowEncoder(encoder.schema) - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false) - } - - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false) - } - - /** - * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`. - * This operation automatically blocks until all added data has been processed. - */ - object CheckLastBatch { - def apply[A : Encoder](data: A*): CheckAnswerRows = { - val encoder = encoderFor[A] - val toExternalRow = RowEncoder(encoder.schema) - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true) - } - - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true) - } - - case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean) - extends StreamAction with StreamMustBeRunning { - override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}" - private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" - } - - /** Stops the stream. It must currently be running. */ - case object StopStream extends StreamAction with StreamMustBeRunning - - /** Starts the stream, resuming if data has already been processed. It must not be running. */ - case class StartStream( - trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock) - extends StreamAction - - /** Advance the trigger clock's time manually. */ - case class AdvanceManualClock(timeToAdd: Long) extends StreamAction - - /** Signals that a failure is expected and should not kill the test. */ - case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { - val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]" - } - - /** Assert that a body is true */ - class Assert(condition: => Boolean, val message: String = "") extends StreamAction { - def run(): Unit = { Assertions.assert(condition) } - override def toString: String = s"Assert(<condition>, $message)" - } - - object Assert { - def apply(condition: => Boolean, message: String = ""): Assert = new Assert(condition, message) - def apply(message: String)(body: => Unit): Assert = new Assert( { body; true }, message) - def apply(body: => Unit): Assert = new Assert( { body; true }, "") - } - - /** Assert that a condition on the active query is true */ - class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String) - extends StreamAction { - override def toString: String = s"AssertOnQuery(<condition>, $message)" - } - - object AssertOnQuery { - def apply(condition: StreamExecution => Boolean, message: String = ""): AssertOnQuery = { - new AssertOnQuery(condition, message) - } - - def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = { - new AssertOnQuery(condition, message) - } - } - - /** - * Executes the specified actions on the given streaming DataFrame and provides helpful - * error messages in the case of failures or incorrect answers. - * - * Note that if the stream is not explicitly started before an action that requires it to be - * running then it will be automatically started before performing any other actions. - */ - def testStream( - _stream: Dataset[_], - outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = { - - val stream = _stream.toDF() - var pos = 0 - var currentPlan: LogicalPlan = stream.logicalPlan - var currentStream: StreamExecution = null - var lastStream: StreamExecution = null - val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for - val sink = new MemorySink(stream.schema, outputMode) - - @volatile - var streamDeathCause: Throwable = null - - // If the test doesn't manually start the stream, we do it automatically at the beginning. - val startedManually = - actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream]) - val startedTest = if (startedManually) actions else StartStream() +: actions - - def testActions = actions.zipWithIndex.map { - case (a, i) => - if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) { - "=> " + a.toString - } else { - " " + a.toString - } - }.mkString("\n") - - def currentOffsets = - if (currentStream != null) currentStream.committedOffsets.toString else "not started" - - def threadState = - if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" - - def testState = - s""" - |== Progress == - |$testActions - | - |== Stream == - |Output Mode: $outputMode - |Stream state: $currentOffsets - |Thread state: $threadState - |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""} - | - |== Sink == - |${sink.toDebugString} - | - | - |== Plan == - |${if (currentStream != null) currentStream.lastExecution else ""} - """.stripMargin - - def verify(condition: => Boolean, message: String): Unit = { - if (!condition) { - failTest(message) - } - } - - def eventually[T](message: String)(func: => T): T = { - try { - Eventually.eventually(Timeout(streamingTimeout)) { - func - } - } catch { - case NonFatal(e) => - failTest(message, e) - } - } - - def failTest(message: String, cause: Throwable = null) = { - - // Recursively pretty print a exception with truncated stacktrace and internal cause - def exceptionToString(e: Throwable, prefix: String = ""): String = { - val base = s"$prefix${e.getMessage}" + - e.getStackTrace.take(10).mkString(s"\n$prefix", s"\n$prefix\t", "\n") - if (e.getCause != null) { - base + s"\n$prefix\tCaused by: " + exceptionToString(e.getCause, s"$prefix\t") - } else { - base - } - } - val c = Option(cause).map(exceptionToString(_)) - val m = if (message != null && message.size > 0) Some(message) else None - fail( - s""" - |${(m ++ c).mkString(": ")} - |$testState - """.stripMargin) - } - - val testThread = Thread.currentThread() - val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath - - try { - startedTest.foreach { action => - action match { - case StartStream(trigger, triggerClock) => - verify(currentStream == null, "stream already running") - lastStream = currentStream - currentStream = - spark - .streams - .startQuery( - StreamExecution.nextName, - metadataRoot, - stream, - sink, - outputMode, - trigger, - triggerClock) - .asInstanceOf[StreamExecution] - currentStream.microBatchThread.setUncaughtExceptionHandler( - new UncaughtExceptionHandler { - override def uncaughtException(t: Thread, e: Throwable): Unit = { - streamDeathCause = e - testThread.interrupt() - } - }) - - case AdvanceManualClock(timeToAdd) => - verify(currentStream != null, - "can not advance manual clock when a stream is not running") - verify(currentStream.triggerClock.isInstanceOf[ManualClock], - s"can not advance clock of type ${currentStream.triggerClock.getClass}") - currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) - - case StopStream => - verify(currentStream != null, "can not stop a stream that is not running") - try failAfter(streamingTimeout) { - currentStream.stop() - verify(!currentStream.microBatchThread.isAlive, - s"microbatch thread not stopped") - verify(!currentStream.isActive, - "query.isActive() is false even after stopping") - verify(currentStream.exception.isEmpty, - s"query.exception() is not empty after clean stop: " + - currentStream.exception.map(_.toString()).getOrElse("")) - } catch { - case _: InterruptedException => - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out while stopping and waiting for microbatchthread to terminate.") - case t: Throwable => - failTest("Error while stopping stream", t) - } finally { - lastStream = currentStream - currentStream = null - } - - case ef: ExpectFailure[_] => - verify(currentStream != null, "can not expect failure when stream is not running") - try failAfter(streamingTimeout) { - val thrownException = intercept[ContinuousQueryException] { - currentStream.awaitTermination() - } - eventually("microbatch thread not stopped after termination with failure") { - assert(!currentStream.microBatchThread.isAlive) - } - verify(thrownException.query.eq(currentStream), - s"incorrect query reference in exception") - verify(currentStream.exception === Some(thrownException), - s"incorrect exception returned by query.exception()") - - val exception = currentStream.exception.get - verify(exception.cause.getClass === ef.causeClass, - "incorrect cause in exception returned by query.exception()\n" + - s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}") - } catch { - case _: InterruptedException => - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out while waiting for failure") - case t: Throwable => - failTest("Error while checking stream failure", t) - } finally { - lastStream = currentStream - currentStream = null - streamDeathCause = null - } - - case a: AssertOnQuery => - verify(currentStream != null || lastStream != null, - "cannot assert when not stream has been started") - val streamToAssert = Option(currentStream).getOrElse(lastStream) - verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") - - case a: Assert => - val streamToAssert = Option(currentStream).getOrElse(lastStream) - verify({ a.run(); true }, s"Assert failed: ${a.message}") - - case a: AddData => - try { - // Add data and get the source where it was added, and the expected offset of the - // added data. - val queryToUse = Option(currentStream).orElse(Option(lastStream)) - val (source, offset) = a.addData(queryToUse) - - def findSourceIndex(plan: LogicalPlan): Option[Int] = { - plan - .collect { case StreamingExecutionRelation(s, _) => s } - .zipWithIndex - .find(_._1 == source) - .map(_._2) - } - - // Try to find the index of the source to which data was added. Either get the index - // from the current active query or the original input logical plan. - val sourceIndex = - queryToUse.flatMap { query => - findSourceIndex(query.logicalPlan) - }.orElse { - findSourceIndex(stream.logicalPlan) - }.getOrElse { - throw new IllegalArgumentException( - "Could find index of the source to which data was added") - } - - // Store the expected offset of added data to wait for it later - awaiting.put(sourceIndex, offset) - } catch { - case NonFatal(e) => - failTest("Error adding data", e) - } - - case CheckAnswerRows(expectedAnswer, lastOnly) => - verify(currentStream != null, "stream not running") - // Get the map of source index to the current source objects - val indexToSource = currentStream - .logicalPlan - .collect { case StreamingExecutionRelation(s, _) => s } - .zipWithIndex - .map(_.swap) - .toMap - - // Block until all data added has been processed for all the source - awaiting.foreach { case (sourceIndex, offset) => - failAfter(streamingTimeout) { - currentStream.awaitOffset(indexToSource(sourceIndex), offset) - } - } - - val sparkAnswer = try if (lastOnly) sink.latestBatchData else sink.allData catch { - case e: Exception => - failTest("Exception while getting data from sink", e) - } - - QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach { - error => failTest(error) - } - } - pos += 1 - } - } catch { - case _: InterruptedException if streamDeathCause != null => - failTest("Stream Thread Died") - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out waiting for stream") - } finally { - if (currentStream != null && currentStream.microBatchThread.isAlive) { - currentStream.stop() - } - } - } - - /** - * Creates a stress test that randomly starts/stops/adds data/checks the result. - * - * @param ds a dataframe that executes + 1 on a stream of integers, returning the result. - * @param addData and add data action that adds the given numbers to the stream, encoding them - * as needed - */ - def runStressTest( - ds: Dataset[Int], - addData: Seq[Int] => StreamAction, - iterations: Int = 100): Unit = { - implicit val intEncoder = ExpressionEncoder[Int]() - var dataPos = 0 - var running = true - val actions = new ArrayBuffer[StreamAction]() - - def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) } - - def addRandomData() = { - val numItems = Random.nextInt(10) - val data = dataPos until (dataPos + numItems) - dataPos += numItems - actions += addData(data) - } - - (1 to iterations).foreach { i => - val rand = Random.nextDouble() - if(!running) { - rand match { - case r if r < 0.7 => // AddData - addRandomData() - - case _ => // StartStream - actions += StartStream() - running = true - } - } else { - rand match { - case r if r < 0.1 => - addCheck() - - case r if r < 0.7 => // AddData - addRandomData() - - case _ => // StopStream - addCheck() - actions += StopStream - running = false - } - } - } - if(!running) { actions += StartStream() } - addCheck() - testStream(ds)(actions: _*) - } - - - object AwaitTerminationTester { - - trait ExpectedBehavior - - /** Expect awaitTermination to not be blocked */ - case object ExpectNotBlocked extends ExpectedBehavior - - /** Expect awaitTermination to get blocked */ - case object ExpectBlocked extends ExpectedBehavior - - /** Expect awaitTermination to throw an exception */ - case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E]) - extends ExpectedBehavior - - private val DEFAULT_TEST_TIMEOUT = 1 second - - def test( - expectedBehavior: ExpectedBehavior, - awaitTermFunc: () => Unit, - testTimeout: Span = DEFAULT_TEST_TIMEOUT - ): Unit = { - - expectedBehavior match { - case ExpectNotBlocked => - withClue("Got blocked when expected non-blocking.") { - failAfter(testTimeout) { - awaitTermFunc() - } - } - - case ExpectBlocked => - withClue("Was not blocked when expected.") { - intercept[TestFailedDueToTimeoutException] { - failAfter(testTimeout) { - awaitTermFunc() - } - } - } - - case e: ExpectException[_] => - val thrownException = - withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") { - intercept[ContinuousQueryException] { - failAfter(testTimeout) { - awaitTermFunc() - } - } - } - assert(thrownException.cause.getClass === e.t.runtimeClass, - "exception of incorrect type was throw") - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 7f99d30..00d5e05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.ProcessingTime +import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.util.{Clock, ManualClock, SystemClock} class ProcessingTimeExecutorSuite extends SparkFunSuite { http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala new file mode 100644 index 0000000..cdd97da --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.concurrent.ConcurrentLinkedQueue + +import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester._ +import org.scalatest.concurrent.AsyncAssertions.Waiter +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.execution.streaming._ + + +class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + import ContinuousQueryListener._ + + after { + spark.streams.active.foreach(_.stop()) + assert(spark.streams.active.isEmpty) + assert(addedListeners.isEmpty) + // Make sure we don't leak any events to the next test + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + } + + test("single listener") { + val listener = new QueryStatusCollector + val input = MemoryStream[Int] + withListenerAdded(listener) { + testStream(input.toDS)( + StartStream(), + Assert("Incorrect query status in onQueryStarted") { + val status = listener.startStatus + assert(status != null) + assert(status.active == true) + assert(status.sourceStatuses.size === 1) + assert(status.sourceStatuses(0).description.contains("Memory")) + + // The source and sink offsets must be None as this must be called before the + // batches have started + assert(status.sourceStatuses(0).offset === None) + assert(status.sinkStatus.offset === CompositeOffset(None :: Nil)) + + // No progress events or termination events + assert(listener.progressStatuses.isEmpty) + assert(listener.terminationStatus === null) + }, + AddDataMemory(input, Seq(1, 2, 3)), + CheckAnswer(1, 2, 3), + Assert("Incorrect query status in onQueryProgress") { + eventually(Timeout(streamingTimeout)) { + + // There should be only on progress event as batch has been processed + assert(listener.progressStatuses.size === 1) + val status = listener.progressStatuses.peek() + assert(status != null) + assert(status.active == true) + assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) + + // No termination events + assert(listener.terminationStatus === null) + } + }, + StopStream, + Assert("Incorrect query status in onQueryTerminated") { + eventually(Timeout(streamingTimeout)) { + val status = listener.terminationStatus + assert(status != null) + + assert(status.active === false) // must be inactive by the time onQueryTerm is called + assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) + } + listener.checkAsyncErrors() + } + ) + } + } + + test("adding and removing listener") { + def isListenerActive(listener: QueryStatusCollector): Boolean = { + listener.reset() + testStream(MemoryStream[Int].toDS)( + StartStream(), + StopStream + ) + listener.startStatus != null + } + + try { + val listener1 = new QueryStatusCollector + val listener2 = new QueryStatusCollector + + spark.streams.addListener(listener1) + assert(isListenerActive(listener1) === true) + assert(isListenerActive(listener2) === false) + spark.streams.addListener(listener2) + assert(isListenerActive(listener1) === true) + assert(isListenerActive(listener2) === true) + spark.streams.removeListener(listener1) + assert(isListenerActive(listener1) === false) + assert(isListenerActive(listener2) === true) + } finally { + addedListeners.foreach(spark.streams.removeListener) + } + } + + test("event ordering") { + val listener = new QueryStatusCollector + withListenerAdded(listener) { + for (i <- 1 to 100) { + listener.reset() + require(listener.startStatus === null) + testStream(MemoryStream[Int].toDS)( + StartStream(), + Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), + StopStream, + Assert { listener.checkAsyncErrors() } + ) + } + } + } + + + private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { + try { + failAfter(1 minute) { + spark.streams.addListener(listener) + body + } + } finally { + spark.streams.removeListener(listener) + } + } + + private def addedListeners(): Array[ContinuousQueryListener] = { + val listenerBusMethod = + PrivateMethod[ContinuousQueryListenerBus]('listenerBus) + val listenerBus = spark.streams invokePrivate listenerBusMethod() + listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener]) + } + + class QueryStatusCollector extends ContinuousQueryListener { + // to catch errors in the async listener events + @volatile private var asyncTestWaiter = new Waiter + + @volatile var startStatus: QueryStatus = null + @volatile var terminationStatus: QueryStatus = null + val progressStatuses = new ConcurrentLinkedQueue[QueryStatus] + + def reset(): Unit = { + startStatus = null + terminationStatus = null + progressStatuses.clear() + asyncTestWaiter = new Waiter + } + + def checkAsyncErrors(): Unit = { + asyncTestWaiter.await(timeout(streamingTimeout)) + } + + + override def onQueryStarted(queryStarted: QueryStarted): Unit = { + asyncTestWaiter { + startStatus = QueryStatus(queryStarted.query) + } + } + + override def onQueryProgress(queryProgress: QueryProgress): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryProgress called before onQueryStarted") + progressStatuses.add(QueryStatus(queryProgress.query)) + } + } + + override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryTerminated called before onQueryStarted") + terminationStatus = QueryStatus(queryTerminated.query) + } + asyncTestWaiter.dismiss() + } + } + + case class QueryStatus( + active: Boolean, + exception: Option[Exception], + sourceStatuses: Array[SourceStatus], + sinkStatus: SinkStatus) + + object QueryStatus { + def apply(query: ContinuousQuery): QueryStatus = { + QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index b75c3ea..c1e4970 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,12 +28,11 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, OutputMode, StreamTest} +import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils -class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { import AwaitTerminationTester._ import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index f469cde..e4ca86d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkException -import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} -import org.apache.spark.sql.test.SharedSQLContext -class ContinuousQuerySuite extends StreamTest with SharedSQLContext { + +class ContinuousQuerySuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 3d8dcaf..1c73208 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils -class FileStreamSinkSuite extends StreamTest with SharedSQLContext { +class FileStreamSinkSuite extends StreamTest { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1d784f1..f681b88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -137,7 +137,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { val valueSchema = new StructType().add("value", StringType) } -class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { +class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ @@ -594,7 +594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } -class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext { +class FileStreamSourceStressTestSuite extends FileStreamSourceTest { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 4efb7cf..1c0fb34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -23,9 +23,7 @@ import java.util.UUID import scala.util.Random import scala.util.control.NonFatal -import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils /** @@ -38,7 +36,7 @@ import org.apache.spark.util.Utils * * At the end, the resulting files are loaded and the answer is checked. */ -class FileStressSuite extends StreamTest with SharedSQLContext { +class FileStressSuite extends StreamTest { import testImplicits._ testQuietly("fault tolerance stress test - unpartitioned output") { http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index e5bd0b4..df76499 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils -class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class MemorySinkSuite extends StreamTest with BeforeAndAfter { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala index 81760d2..7f2972e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext -class MemorySourceStressSuite extends StreamTest with SharedSQLContext { +class MemorySourceStressSuite extends StreamTest { import testImplicits._ test("memory stress test") { http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c17cb1d..9414b1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.ManualClock -class StreamSuite extends StreamTest with SharedSQLContext { +class StreamSuite extends StreamTest { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala new file mode 100644 index 0000000..dd8672a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.lang.Thread.UncaughtExceptionHandler + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.language.experimental.macros +import scala.reflect.ClassTag +import scala.util.Random +import scala.util.control.NonFatal + +import org.scalatest.Assertions +import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row} +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} + +/** + * A framework for implementing tests for streaming queries and sources. + * + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order, + * blocking as necessary to let the stream catch up. For example, the following adds some data to + * a stream, blocking until it can verify that the correct values are eventually produced. + * + * {{{ + * val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(_ + 1) + + testStream(mapped)( + AddData(inputData, 1, 2, 3), + CheckAnswer(2, 3, 4)) + * }}} + * + * Note that while we do sleep to allow the other thread to progress without spinning, + * `StreamAction` checks should not depend on the amount of time spent sleeping. Instead they + * should check the actual progress of the stream before verifying the required test condition. + * + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to + * avoid hanging forever in the case of failures. However, individual suites can change this + * by overriding `streamingTimeout`. + */ +trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { + + /** How long to wait for an active stream to catch up when checking a result. */ + val streamingTimeout = 10.seconds + + /** A trait for actions that can be performed while testing a streaming DataFrame. */ + trait StreamAction + + /** A trait to mark actions that require the stream to be actively running. */ + trait StreamMustBeRunning + + /** + * Adds the given data to the stream. Subsequent check answers will block until this data has + * been processed. + */ + object AddData { + def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] = + AddDataMemory(source, data) + } + + /** A trait that can be extended when testing a source. */ + trait AddData extends StreamAction { + /** + * Called to adding the data to a source. It should find the source to add data to from + * the active query, and then return the source object the data was added, as well as the + * offset of added data. + */ + def addData(query: Option[StreamExecution]): (Source, Offset) + } + + case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { + override def toString: String = s"AddData to $source: ${data.mkString(",")}" + + override def addData(query: Option[StreamExecution]): (Source, Offset) = { + (source, source.addData(data)) + } + } + + /** + * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`. + * This operation automatically blocks until all added data has been processed. + */ + object CheckAnswer { + def apply[A : Encoder](data: A*): CheckAnswerRows = { + val encoder = encoderFor[A] + val toExternalRow = RowEncoder(encoder.schema) + CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false) + } + + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false) + } + + /** + * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`. + * This operation automatically blocks until all added data has been processed. + */ + object CheckLastBatch { + def apply[A : Encoder](data: A*): CheckAnswerRows = { + val encoder = encoderFor[A] + val toExternalRow = RowEncoder(encoder.schema) + CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true) + } + + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true) + } + + case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean) + extends StreamAction with StreamMustBeRunning { + override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}" + private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" + } + + /** Stops the stream. It must currently be running. */ + case object StopStream extends StreamAction with StreamMustBeRunning + + /** Starts the stream, resuming if data has already been processed. It must not be running. */ + case class StartStream( + trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock) + extends StreamAction + + /** Advance the trigger clock's time manually. */ + case class AdvanceManualClock(timeToAdd: Long) extends StreamAction + + /** Signals that a failure is expected and should not kill the test. */ + case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { + val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]" + } + + /** Assert that a body is true */ + class Assert(condition: => Boolean, val message: String = "") extends StreamAction { + def run(): Unit = { Assertions.assert(condition) } + override def toString: String = s"Assert(<condition>, $message)" + } + + object Assert { + def apply(condition: => Boolean, message: String = ""): Assert = new Assert(condition, message) + def apply(message: String)(body: => Unit): Assert = new Assert( { body; true }, message) + def apply(body: => Unit): Assert = new Assert( { body; true }, "") + } + + /** Assert that a condition on the active query is true */ + class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String) + extends StreamAction { + override def toString: String = s"AssertOnQuery(<condition>, $message)" + } + + object AssertOnQuery { + def apply(condition: StreamExecution => Boolean, message: String = ""): AssertOnQuery = { + new AssertOnQuery(condition, message) + } + + def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = { + new AssertOnQuery(condition, message) + } + } + + /** + * Executes the specified actions on the given streaming DataFrame and provides helpful + * error messages in the case of failures or incorrect answers. + * + * Note that if the stream is not explicitly started before an action that requires it to be + * running then it will be automatically started before performing any other actions. + */ + def testStream( + _stream: Dataset[_], + outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = { + + val stream = _stream.toDF() + var pos = 0 + var currentPlan: LogicalPlan = stream.logicalPlan + var currentStream: StreamExecution = null + var lastStream: StreamExecution = null + val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for + val sink = new MemorySink(stream.schema, outputMode) + + @volatile + var streamDeathCause: Throwable = null + + // If the test doesn't manually start the stream, we do it automatically at the beginning. + val startedManually = + actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream]) + val startedTest = if (startedManually) actions else StartStream() +: actions + + def testActions = actions.zipWithIndex.map { + case (a, i) => + if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) { + "=> " + a.toString + } else { + " " + a.toString + } + }.mkString("\n") + + def currentOffsets = + if (currentStream != null) currentStream.committedOffsets.toString else "not started" + + def threadState = + if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" + + def testState = + s""" + |== Progress == + |$testActions + | + |== Stream == + |Output Mode: $outputMode + |Stream state: $currentOffsets + |Thread state: $threadState + |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""} + | + |== Sink == + |${sink.toDebugString} + | + | + |== Plan == + |${if (currentStream != null) currentStream.lastExecution else ""} + """.stripMargin + + def verify(condition: => Boolean, message: String): Unit = { + if (!condition) { + failTest(message) + } + } + + def eventually[T](message: String)(func: => T): T = { + try { + Eventually.eventually(Timeout(streamingTimeout)) { + func + } + } catch { + case NonFatal(e) => + failTest(message, e) + } + } + + def failTest(message: String, cause: Throwable = null) = { + + // Recursively pretty print a exception with truncated stacktrace and internal cause + def exceptionToString(e: Throwable, prefix: String = ""): String = { + val base = s"$prefix${e.getMessage}" + + e.getStackTrace.take(10).mkString(s"\n$prefix", s"\n$prefix\t", "\n") + if (e.getCause != null) { + base + s"\n$prefix\tCaused by: " + exceptionToString(e.getCause, s"$prefix\t") + } else { + base + } + } + val c = Option(cause).map(exceptionToString(_)) + val m = if (message != null && message.size > 0) Some(message) else None + fail( + s""" + |${(m ++ c).mkString(": ")} + |$testState + """.stripMargin) + } + + val testThread = Thread.currentThread() + val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + try { + startedTest.foreach { action => + action match { + case StartStream(trigger, triggerClock) => + verify(currentStream == null, "stream already running") + lastStream = currentStream + currentStream = + spark + .streams + .startQuery( + StreamExecution.nextName, + metadataRoot, + stream, + sink, + outputMode, + trigger, + triggerClock) + .asInstanceOf[StreamExecution] + currentStream.microBatchThread.setUncaughtExceptionHandler( + new UncaughtExceptionHandler { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + streamDeathCause = e + testThread.interrupt() + } + }) + + case AdvanceManualClock(timeToAdd) => + verify(currentStream != null, + "can not advance manual clock when a stream is not running") + verify(currentStream.triggerClock.isInstanceOf[ManualClock], + s"can not advance clock of type ${currentStream.triggerClock.getClass}") + currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + + case StopStream => + verify(currentStream != null, "can not stop a stream that is not running") + try failAfter(streamingTimeout) { + currentStream.stop() + verify(!currentStream.microBatchThread.isAlive, + s"microbatch thread not stopped") + verify(!currentStream.isActive, + "query.isActive() is false even after stopping") + verify(currentStream.exception.isEmpty, + s"query.exception() is not empty after clean stop: " + + currentStream.exception.map(_.toString()).getOrElse("")) + } catch { + case _: InterruptedException => + case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out while stopping and waiting for microbatchthread to terminate.") + case t: Throwable => + failTest("Error while stopping stream", t) + } finally { + lastStream = currentStream + currentStream = null + } + + case ef: ExpectFailure[_] => + verify(currentStream != null, "can not expect failure when stream is not running") + try failAfter(streamingTimeout) { + val thrownException = intercept[ContinuousQueryException] { + currentStream.awaitTermination() + } + eventually("microbatch thread not stopped after termination with failure") { + assert(!currentStream.microBatchThread.isAlive) + } + verify(thrownException.query.eq(currentStream), + s"incorrect query reference in exception") + verify(currentStream.exception === Some(thrownException), + s"incorrect exception returned by query.exception()") + + val exception = currentStream.exception.get + verify(exception.cause.getClass === ef.causeClass, + "incorrect cause in exception returned by query.exception()\n" + + s"\tExpected: ${ef.causeClass}\n\tReturned: ${exception.cause.getClass}") + } catch { + case _: InterruptedException => + case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out while waiting for failure") + case t: Throwable => + failTest("Error while checking stream failure", t) + } finally { + lastStream = currentStream + currentStream = null + streamDeathCause = null + } + + case a: AssertOnQuery => + verify(currentStream != null || lastStream != null, + "cannot assert when not stream has been started") + val streamToAssert = Option(currentStream).getOrElse(lastStream) + verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") + + case a: Assert => + val streamToAssert = Option(currentStream).getOrElse(lastStream) + verify({ a.run(); true }, s"Assert failed: ${a.message}") + + case a: AddData => + try { + // Add data and get the source where it was added, and the expected offset of the + // added data. + val queryToUse = Option(currentStream).orElse(Option(lastStream)) + val (source, offset) = a.addData(queryToUse) + + def findSourceIndex(plan: LogicalPlan): Option[Int] = { + plan + .collect { case StreamingExecutionRelation(s, _) => s } + .zipWithIndex + .find(_._1 == source) + .map(_._2) + } + + // Try to find the index of the source to which data was added. Either get the index + // from the current active query or the original input logical plan. + val sourceIndex = + queryToUse.flatMap { query => + findSourceIndex(query.logicalPlan) + }.orElse { + findSourceIndex(stream.logicalPlan) + }.getOrElse { + throw new IllegalArgumentException( + "Could find index of the source to which data was added") + } + + // Store the expected offset of added data to wait for it later + awaiting.put(sourceIndex, offset) + } catch { + case NonFatal(e) => + failTest("Error adding data", e) + } + + case CheckAnswerRows(expectedAnswer, lastOnly) => + verify(currentStream != null, "stream not running") + // Get the map of source index to the current source objects + val indexToSource = currentStream + .logicalPlan + .collect { case StreamingExecutionRelation(s, _) => s } + .zipWithIndex + .map(_.swap) + .toMap + + // Block until all data added has been processed for all the source + awaiting.foreach { case (sourceIndex, offset) => + failAfter(streamingTimeout) { + currentStream.awaitOffset(indexToSource(sourceIndex), offset) + } + } + + val sparkAnswer = try if (lastOnly) sink.latestBatchData else sink.allData catch { + case e: Exception => + failTest("Exception while getting data from sink", e) + } + + QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach { + error => failTest(error) + } + } + pos += 1 + } + } catch { + case _: InterruptedException if streamDeathCause != null => + failTest("Stream Thread Died") + case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out waiting for stream") + } finally { + if (currentStream != null && currentStream.microBatchThread.isAlive) { + currentStream.stop() + } + } + } + + /** + * Creates a stress test that randomly starts/stops/adds data/checks the result. + * + * @param ds a dataframe that executes + 1 on a stream of integers, returning the result. + * @param addData and add data action that adds the given numbers to the stream, encoding them + * as needed + */ + def runStressTest( + ds: Dataset[Int], + addData: Seq[Int] => StreamAction, + iterations: Int = 100): Unit = { + implicit val intEncoder = ExpressionEncoder[Int]() + var dataPos = 0 + var running = true + val actions = new ArrayBuffer[StreamAction]() + + def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) } + + def addRandomData() = { + val numItems = Random.nextInt(10) + val data = dataPos until (dataPos + numItems) + dataPos += numItems + actions += addData(data) + } + + (1 to iterations).foreach { i => + val rand = Random.nextDouble() + if(!running) { + rand match { + case r if r < 0.7 => // AddData + addRandomData() + + case _ => // StartStream + actions += StartStream() + running = true + } + } else { + rand match { + case r if r < 0.1 => + addCheck() + + case r if r < 0.7 => // AddData + addRandomData() + + case _ => // StopStream + addCheck() + actions += StopStream + running = false + } + } + } + if(!running) { actions += StartStream() } + addCheck() + testStream(ds)(actions: _*) + } + + + object AwaitTerminationTester { + + trait ExpectedBehavior + + /** Expect awaitTermination to not be blocked */ + case object ExpectNotBlocked extends ExpectedBehavior + + /** Expect awaitTermination to get blocked */ + case object ExpectBlocked extends ExpectedBehavior + + /** Expect awaitTermination to throw an exception */ + case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E]) + extends ExpectedBehavior + + private val DEFAULT_TEST_TIMEOUT = 1.second + + def test( + expectedBehavior: ExpectedBehavior, + awaitTermFunc: () => Unit, + testTimeout: Span = DEFAULT_TEST_TIMEOUT + ): Unit = { + + expectedBehavior match { + case ExpectNotBlocked => + withClue("Got blocked when expected non-blocking.") { + failAfter(testTimeout) { + awaitTermFunc() + } + } + + case ExpectBlocked => + withClue("Was not blocked when expected.") { + intercept[TestFailedDueToTimeoutException] { + failAfter(testTimeout) { + awaitTermFunc() + } + } + } + + case e: ExpectException[_] => + val thrownException = + withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") { + intercept[ContinuousQueryException] { + failAfter(testTimeout) { + awaitTermFunc() + } + } + } + assert(thrownException.cause.getClass === e.t.runtimeClass, + "exception of incorrect type was throw") + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 322bbb9..1f174ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,19 +20,18 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, StreamTest} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ -import org.apache.spark.sql.test.SharedSQLContext object FailureSinglton { var firstTime = true } -class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll { +class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { override def afterAll(): Unit = { super.afterAll() http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index 38a0534..a2aac69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -101,7 +101,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } } -class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath http://git-wip-us.apache.org/repos/asf/spark/blob/a780848a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala deleted file mode 100644 index 8788898..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.util - -import java.util.concurrent.ConcurrentLinkedQueue - -import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester._ -import org.scalatest.concurrent.AsyncAssertions.Waiter -import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.util.ContinuousQueryListener.{QueryProgress, QueryStarted, QueryTerminated} - -class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { - - import testImplicits._ - - after { - spark.streams.active.foreach(_.stop()) - assert(spark.streams.active.isEmpty) - assert(addedListeners.isEmpty) - // Make sure we don't leak any events to the next test - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - } - - test("single listener") { - val listener = new QueryStatusCollector - val input = MemoryStream[Int] - withListenerAdded(listener) { - testStream(input.toDS)( - StartStream(), - Assert("Incorrect query status in onQueryStarted") { - val status = listener.startStatus - assert(status != null) - assert(status.active == true) - assert(status.sourceStatuses.size === 1) - assert(status.sourceStatuses(0).description.contains("Memory")) - - // The source and sink offsets must be None as this must be called before the - // batches have started - assert(status.sourceStatuses(0).offset === None) - assert(status.sinkStatus.offset === CompositeOffset(None :: Nil)) - - // No progress events or termination events - assert(listener.progressStatuses.isEmpty) - assert(listener.terminationStatus === null) - }, - AddDataMemory(input, Seq(1, 2, 3)), - CheckAnswer(1, 2, 3), - Assert("Incorrect query status in onQueryProgress") { - eventually(Timeout(streamingTimeout)) { - - // There should be only on progress event as batch has been processed - assert(listener.progressStatuses.size === 1) - val status = listener.progressStatuses.peek() - assert(status != null) - assert(status.active == true) - assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) - - // No termination events - assert(listener.terminationStatus === null) - } - }, - StopStream, - Assert("Incorrect query status in onQueryTerminated") { - eventually(Timeout(streamingTimeout)) { - val status = listener.terminationStatus - assert(status != null) - - assert(status.active === false) // must be inactive by the time onQueryTerm is called - assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) - } - listener.checkAsyncErrors() - } - ) - } - } - - test("adding and removing listener") { - def isListenerActive(listener: QueryStatusCollector): Boolean = { - listener.reset() - testStream(MemoryStream[Int].toDS)( - StartStream(), - StopStream - ) - listener.startStatus != null - } - - try { - val listener1 = new QueryStatusCollector - val listener2 = new QueryStatusCollector - - spark.streams.addListener(listener1) - assert(isListenerActive(listener1) === true) - assert(isListenerActive(listener2) === false) - spark.streams.addListener(listener2) - assert(isListenerActive(listener1) === true) - assert(isListenerActive(listener2) === true) - spark.streams.removeListener(listener1) - assert(isListenerActive(listener1) === false) - assert(isListenerActive(listener2) === true) - } finally { - addedListeners.foreach(spark.streams.removeListener) - } - } - - test("event ordering") { - val listener = new QueryStatusCollector - withListenerAdded(listener) { - for (i <- 1 to 100) { - listener.reset() - require(listener.startStatus === null) - testStream(MemoryStream[Int].toDS)( - StartStream(), - Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), - StopStream, - Assert { listener.checkAsyncErrors() } - ) - } - } - } - - - private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { - try { - failAfter(1 minute) { - spark.streams.addListener(listener) - body - } - } finally { - spark.streams.removeListener(listener) - } - } - - private def addedListeners(): Array[ContinuousQueryListener] = { - val listenerBusMethod = - PrivateMethod[ContinuousQueryListenerBus]('listenerBus) - val listenerBus = spark.streams invokePrivate listenerBusMethod() - listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener]) - } - - class QueryStatusCollector extends ContinuousQueryListener { - // to catch errors in the async listener events - @volatile private var asyncTestWaiter = new Waiter - - @volatile var startStatus: QueryStatus = null - @volatile var terminationStatus: QueryStatus = null - val progressStatuses = new ConcurrentLinkedQueue[QueryStatus] - - def reset(): Unit = { - startStatus = null - terminationStatus = null - progressStatuses.clear() - asyncTestWaiter = new Waiter - } - - def checkAsyncErrors(): Unit = { - asyncTestWaiter.await(timeout(streamingTimeout)) - } - - - override def onQueryStarted(queryStarted: QueryStarted): Unit = { - asyncTestWaiter { - startStatus = QueryStatus(queryStarted.query) - } - } - - override def onQueryProgress(queryProgress: QueryProgress): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryProgress called before onQueryStarted") - progressStatuses.add(QueryStatus(queryProgress.query)) - } - } - - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = QueryStatus(queryTerminated.query) - } - asyncTestWaiter.dismiss() - } - } - - case class QueryStatus( - active: Boolean, - exception: Option[Exception], - sourceStatuses: Array[SourceStatus], - sinkStatus: SinkStatus) - - object QueryStatus { - def apply(query: ContinuousQuery): QueryStatus = { - QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus) - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org