Repository: spark Updated Branches: refs/heads/branch-1.2 f9d8c5e3f -> 7f19c7c1b
[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage (branch-1.2 backport) (This PR backports #3801 into `branch-1.2` (1.2.2)) This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <joshro...@databricks.com> Closes #4633 from JoshRosen/spark-1600-b12-backport and squashes the following commits: e5d3dc4 [Josh Rosen] [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f19c7c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f19c7c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f19c7c1 Branch: refs/heads/branch-1.2 Commit: 7f19c7c1b4d1942df68c08266c640e9604a50bde Parents: f9d8c5e Author: Josh Rosen <joshro...@databricks.com> Authored: Mon Feb 16 15:41:38 2015 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Feb 16 15:41:38 2015 -0800 ---------------------------------------------------------------------- .../streaming/dstream/FileInputDStream.scala | 16 +- .../org/apache/spark/streaming/util/Clock.scala | 6 +- .../spark/streaming/BasicOperationsSuite.scala | 2 +- .../spark/streaming/CheckpointSuite.scala | 248 +++++++++++-------- .../spark/streaming/InputStreamsSuite.scala | 69 +++--- .../apache/spark/streaming/TestSuiteBase.scala | 46 +++- 6 files changed, 251 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 5f13fdc..e7c5639 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag @@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { + // This is a def so that it works during checkpoint recovery: + private def clock = ssc.scheduler.clock + // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData // Initial ignore threshold based on which old, existing files in the directory (at the time of // starting the streaming application) will be ignored or considered - private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L + private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L /* * Make sure that the information of files selected in the last few batches are remembered. @@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas remember(durationToRemember) // Map of batch-time to selected file info for the remembered batches + // This is a concurrent map because it's also accessed in unit tests @transient private[streaming] var batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] + new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas */ private def findNewFiles(currentTime: Long): Array[String] = { try { - lastNewFileFindingTime = System.currentTimeMillis + lastNewFileFindingTime = clock.currentTime() // Calculate ignore threshold val modTimeIgnoreThreshold = math.max( @@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) - val timeTaken = System.currentTimeMillis - lastNewFileFindingTime + val timeTaken = clock.currentTime() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) if (timeTaken > slideDuration.milliseconds) { @@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] () - batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]() + batchTimeToSelectedFiles = + new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index 7cd867c..d6d96d7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -59,9 +59,11 @@ class SystemClock() extends Clock { private[streaming] class ManualClock() extends Clock { - var time = 0L + private var time = 0L - def currentTime() = time + def currentTime() = this.synchronized { + time + } def setTime(timeToSet: Long) = { this.synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index f4a269c..9e08f1e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -565,7 +565,7 @@ class BasicOperationsSuite extends TestSuiteBase { if (rememberDuration != null) ssc.remember(rememberDuration) val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) + assert(clock.currentTime() === Seconds(10).milliseconds) assert(output.size === numExpectedOutput) operatedStream } http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c9f6ddc..e685a17 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,17 +18,18 @@ package org.apache.spark.streaming import java.io.File -import java.nio.charset.Charset -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag +import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.scalatest.concurrent.Eventually._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._ @@ -47,8 +48,6 @@ class CheckpointSuite extends TestSuiteBase { override def batchDuration = Milliseconds(500) - override def actuallyWait = true // to allow checkpoints to be written - override def beforeFunction() { super.beforeFunction() Utils.deleteRecursively(new File(checkpointDir)) @@ -145,7 +144,6 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() - System.clearProperty("spark.streaming.manualClock.jump") ssc = null } @@ -314,109 +312,161 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } - // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the // failure, are re-processed or not. test("recovery with file input stream") { // Set up the streaming context and input streams + val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's. val testDir = Utils.createTempDir() - var ssc = new StreamingContext(master, framework, Seconds(1)) - ssc.checkpoint(checkpointDir) - val fileStream = ssc.textFileStream(testDir.toString) - // Making value 3 take large time to process, to ensure that the master - // shuts down in the middle of processing the 3rd batch - val mappedStream = fileStream.map(s => { - val i = s.toInt - if (i == 3) Thread.sleep(2000) - i - }) - - // Reducing over a large window to ensure that recovery from master failure - // requires reprocessing of all the files seen before the failure - val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) - val outputBuffer = new ArrayBuffer[Seq[Int]] - var outputStream = new TestOutputStream(reducedStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files and advance manual clock to process them - // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(1000) - for (i <- Seq(1, 2, 3)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - // wait to make sure that the file is written such that it gets shown in the file listings - Thread.sleep(1000) + val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + + /** + * Writes a file named `i` (which contains the number `i`) to the test directory and sets its + * modification time to `clock`'s current time. + */ + def writeFile(i: Int, clock: ManualClock): Unit = { + val file = new File(testDir, i.toString) + Files.write(i + "\n", file, Charsets.UTF_8) + assert(file.setLastModified(clock.currentTime())) + // Check that the file's modification date is actually the value we wrote, since rounding or + // truncation will break the test: + assert(file.lastModified() === clock.currentTime()) } - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0, "No files processed before restart") - ssc.stop() - // Verify whether files created have been recorded correctly or not - var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten - assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) - - // Create files while the master is down - for (i <- Seq(4, 5, 6)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - Thread.sleep(1000) + /** + * Returns ids that identify which files which have been recorded by the file input stream. + */ + def recordedFiles(ssc: StreamingContext): Seq[Int] = { + val fileInputDStream = + ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten + filenames.map(_.split(File.separator).last.toInt).toSeq.sorted } - // Recover context from checkpoint file and verify whether the files that were - // recorded before failure were saved and successfully recovered - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + try { + // This is a var because it's re-assigned when we restart from a checkpoint + var clock: ManualClock = null + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + ssc.checkpoint(checkpointDir) + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.textFileStream(testDir.toString) + // Make value 3 take a large time to process, to ensure that the driver + // shuts down in the middle of processing the 3rd batch + CheckpointSuite.batchThreeShouldBlockIndefinitely = true + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) { + while (CheckpointSuite.batchThreeShouldBlockIndefinitely) { + Thread.sleep(Long.MaxValue) + } + } + i + }) + + // Reducing over a large window to ensure that recovery from driver failure + // requires reprocessing of all the files seen before the failure + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputStream = new TestOutputStream(reducedStream, outputBuffer) + outputStream.register() + ssc.start() + + // Advance half a batch so that the first file is created after the StreamingContext starts + clock.addToTime(batchDuration.milliseconds / 2) + // Create files and advance manual clock to process them + for (i <- Seq(1, 2, 3)) { + writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + if (i != 3) { + // Since we want to shut down while the 3rd batch is processing + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === i) + } + } + } + clock.addToTime(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + // Wait until all files have been recorded and all batches have started + assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) + } + // Wait for a checkpoint to be written + val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) + eventually(eventuallyTimeout) { + assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6) + } + ssc.stop() + // Check that we shut down while the third batch was being processed + assert(batchCounter.getNumCompletedBatches === 2) + assert(outputStream.output.flatten === Seq(1, 3)) + } - // Restart stream computation - ssc.start() - for (i <- Seq(7, 8, 9)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - Thread.sleep(1000) - } - Thread.sleep(1000) - logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) - assert(outputStream.output.size > 0, "No files processed after restart") - ssc.stop() + // The original StreamingContext has now been stopped. + CheckpointSuite.batchThreeShouldBlockIndefinitely = false - // Verify whether files created while the driver was down have been recorded or not - assert(!recordedFiles.filter(_.endsWith("4")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("5")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("6")).isEmpty) - - // Verify whether new files created after recover have been recorded or not - assert(!recordedFiles.filter(_.endsWith("7")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("8")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("9")).isEmpty) - - // Append the new output to the old buffer - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] - outputBuffer ++= outputStream.output - - val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - val output = outputBuffer.flatMap(x => x) - assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed - output.foreach(o => // To ensure all the inputs are correctly added cumulatively - assert(expectedOutput.contains(o), "Expected value " + o + " not found") - ) - // To ensure that all the inputs were received correctly - assert(expectedOutput.last === output.last) - Utils.deleteRecursively(testDir) + // Create files while the streaming driver is down + for (i <- Seq(4, 5, 6)) { + writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + } + + // Recover context from checkpoint file and verify whether the files that were + // recorded before failure were saved and successfully recovered + logInfo("*********** RESTARTING ************") + withStreamingContext(new StreamingContext(checkpointDir)) { ssc => + // So that the restarted StreamingContext's clock has gone forward in time since failure + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString) + val oldClockTime = clock.currentTime() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val batchCounter = new BatchCounter(ssc) + val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + // Check that we remember files that were recorded before the restart + assert(recordedFiles(ssc) === Seq(1, 2, 3)) + + // Restart stream computation + ssc.start() + // Verify that the clock has traveled forward to the expected time + eventually(eventuallyTimeout) { + clock.currentTime() === oldClockTime + } + // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch) + val numBatchesAfterRestart = 4 + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart) + } + for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { + writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) + } + } + clock.addToTime(batchDuration.milliseconds) + logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) + assert(outputStream.output.size > 0, "No files processed after restart") + ssc.stop() + + // Verify whether files created while the driver was down (4, 5, 6) and files created after + // recovery (7, 8, 9) have been recorded + assert(recordedFiles(ssc) === (1 to 9)) + + // Append the new output to the old buffer + outputBuffer ++= outputStream.output + + // Verify whether all the elements received are as expected + val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) + assert(outputBuffer.flatten.toSet === expectedOutput.toSet) + } + } finally { + Utils.deleteRecursively(testDir) + } } @@ -473,12 +523,12 @@ class CheckpointSuite extends TestSuiteBase { */ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.time) + logInfo("Manual clock before advancing = " + clock.currentTime()) for (i <- 1 to numBatches.toInt) { clock.addToTime(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.time) + logInfo("Manual clock after advancing = " + clock.currentTime()) Thread.sleep(batchDuration.milliseconds) val outputStream = ssc.graph.getOutputStreams.filter { dstream => @@ -487,3 +537,7 @@ class CheckpointSuite extends TestSuiteBase { outputStream.output.map(_.flatten) } } + +private object CheckpointSuite extends Serializable { + var batchThreeShouldBlockIndefinitely: Boolean = true +} http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 307052a..bddf51e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} -import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files @@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } def testFileStream(newFilesOnly: Boolean) { - var ssc: StreamingContext = null val testDir: File = null try { + val batchDuration = Seconds(2) val testDir = Utils.createTempDir() + // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) - Thread.sleep(1000) // Set up the streaming context and input streams - val newConf = conf.clone.set( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - ssc = new StreamingContext(newConf, batchDuration) - val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the directory - val input = Seq(1, 2, 3, 4, 5) - input.foreach { i => - Thread.sleep(batchDuration.milliseconds) - val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charset.forName("UTF-8")) - logInfo("Created file " + file) - } + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // This `setTime` call ensures that the clock is past the creation time of `existingFile` + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.addToTime(batchDuration.milliseconds / 2) + + // Over time, create files in the directory + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + val file = new File(testDir, i.toString) + Files.write(i + "\n", file, Charset.forName("UTF-8")) + assert(file.setLastModified(clock.currentTime())) + assert(file.lastModified === clock.currentTime) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === i) + } + } - // Verify that all the files have been read - val expectedOutput = if (newFilesOnly) { - input.map(_.toString).toSet - } else { - (Seq(0) ++ input).map(_.toString).toSet - } - eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { + // Verify that all the files have been read + val expectedOutput = if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet + } assert(outputBuffer.flatten.toSet === expectedOutput) } } finally { - if (ssc != null) ssc.stop() if (testDir != null) Utils.deleteRecursively(testDir) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7f19c7c1/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 52972f6..7d82c3e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.SynchronizedBuffer +import scala.language.implicitConversions import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.time.{Span, Seconds => ScalaTestSeconds} +import org.scalatest.concurrent.Eventually.timeout +import org.scalatest.concurrent.PatienceConfiguration import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} +import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.{SparkConf, Logging} import org.apache.spark.rdd.RDD @@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], } /** + * An object that counts the number of started / completed batches. This is implemented using a + * StreamingListener. Constructing a new instance automatically registers a StreamingListener on + * the given StreamingContext. + */ +class BatchCounter(ssc: StreamingContext) { + + // All access to this state should be guarded by `BatchCounter.this.synchronized` + private var numCompletedBatches = 0 + private var numStartedBatches = 0 + + private val listener = new StreamingListener { + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = + BatchCounter.this.synchronized { + numStartedBatches += 1 + BatchCounter.this.notifyAll() + } + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = + BatchCounter.this.synchronized { + numCompletedBatches += 1 + BatchCounter.this.notifyAll() + } + } + ssc.addStreamingListener(listener) + + def getNumCompletedBatches: Int = this.synchronized { + numCompletedBatches + } + + def getNumStartedBatches: Int = this.synchronized { + numStartedBatches + } +} + +/** * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. */ @@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { .setMaster(master) .setAppName(framework) + // Timeout for use in ScalaTest `eventually` blocks + val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds)) + // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { @@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Advance manual clock val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.time) + logInfo("Manual clock before advancing = " + clock.currentTime()) if (actuallyWait) { for (i <- 1 to numBatches) { logInfo("Actually waiting for " + batchDuration) @@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } else { clock.addToTime(numBatches * batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.time) + logInfo("Manual clock after advancing = " + clock.currentTime()) // Wait until expected number of output items have been generated val startTime = System.currentTimeMillis() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org