Repository: spark Updated Branches: refs/heads/branch-2.0 b716e104b -> b37177c22
[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in FileStreamSource ## What changes were proposed in this pull request? Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches. ## How was this patch tested? Added unit tests Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #14143 from tdas/SPARK-16430-1. (cherry picked from commit e50efd53f073890d789a8448f850cc219cca7708) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b37177c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b37177c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b37177c2 Branch: refs/heads/branch-2.0 Commit: b37177c22f5c0f927b8d9f3a38dba9617d36c944 Parents: b716e10 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon Jul 11 18:41:36 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Jul 11 18:41:45 2016 -0700 ---------------------------------------------------------------------- .../execution/streaming/FileStreamSource.scala | 6 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 35 ++++++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b37177c2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 72b335a..0cfad65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -73,8 +73,8 @@ class FileStreamSource( logTrace(s"Number of seen files = ${seenFiles.size}") if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, newFiles) - logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files") + metadataLog.add(maxBatchId, batchFiles) + logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } new LongOffset(maxBatchId) @@ -138,7 +138,7 @@ class FileStreamSource( .map { str => Try(str.toInt).toOption.filter(_ > 0).getOrElse { throw new IllegalArgumentException( - s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer") + s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/b37177c2/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 3d28d4f..47260a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { checkAnswer(df, data.map(_.toString).toDF("value")) } + def checkAllData(data: Seq[Int]): Unit = { + val schema = StructType(Seq(StructField("value", StringType))) + val df = spark.createDataFrame( + spark.sparkContext.makeRDD(memorySink.allData), schema) + checkAnswer(df, data.map(_.toString).toDF("value")) + } + /** Check how many batches have executed since the last time this check was made */ var lastBatchId = -1L def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = { @@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last) + checkAllData(1 to 3) lastBatchId = memorySink.latestBatchId.get fileSource.withBatchingLocked { @@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest { createFile(7) // 6 and 7 should be in the last batch } q.processAllAvailable() - checkLastBatchData(6, 7) checkNumBatchesSinceLastCheck(2) + checkLastBatchData(6, 7) + checkAllData(1 to 7) fileSource.withBatchingLocked { createFile(8) @@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest { createFile(12) // 12 should be in the last batch } q.processAllAvailable() - checkLastBatchData(12) checkNumBatchesSinceLastCheck(3) + checkLastBatchData(12) + checkAllData(1 to 12) + + q.stop() + } + } + + test("max files per trigger - incorrect values") { + withTempDir { case src => + def testMaxFilePerTriggerValue(value: String): Unit = { + val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath) + val e = intercept[IllegalArgumentException] { + testStream(df)() + } + Seq("maxFilesPerTrigger", value, "positive integer").foreach { s => + assert(e.getMessage.contains(s)) + } + } + + testMaxFilePerTriggerValue("not-a-integer") + testMaxFilePerTriggerValue("-1") + testMaxFilePerTriggerValue("0") + testMaxFilePerTriggerValue("10.1") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org