Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b716e104b -> b37177c22


[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in 
FileStreamSource

## What changes were proposed in this pull request?

Incorrect list of files were being allocated to a batch. This caused a file to 
read multiple times in the multiple batches.

## How was this patch tested?

Added unit tests

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #14143 from tdas/SPARK-16430-1.

(cherry picked from commit e50efd53f073890d789a8448f850cc219cca7708)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b37177c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b37177c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b37177c2

Branch: refs/heads/branch-2.0
Commit: b37177c22f5c0f927b8d9f3a38dba9617d36c944
Parents: b716e10
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Jul 11 18:41:36 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Jul 11 18:41:45 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  |  6 ++--
 .../sql/streaming/FileStreamSourceSuite.scala   | 35 ++++++++++++++++++--
 2 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b37177c2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 72b335a..0cfad65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -73,8 +73,8 @@ class FileStreamSource(
     logTrace(s"Number of seen files = ${seenFiles.size}")
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, newFiles)
-      logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} 
new files")
+      metadataLog.add(maxBatchId, batchFiles)
+      logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} 
new files")
     }
 
     new LongOffset(maxBatchId)
@@ -138,7 +138,7 @@ class FileStreamSource(
       .map { str =>
         Try(str.toInt).toOption.filter(_ > 0).getOrElse {
           throw new IllegalArgumentException(
-            s"Invalid value '$str' for option 'maxFilesPerBatch', must be a 
positive integer")
+            s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a 
positive integer")
         }
       }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b37177c2/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f..47260a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         checkAnswer(df, data.map(_.toString).toDF("value"))
       }
 
+      def checkAllData(data: Seq[Int]): Unit = {
+        val schema = StructType(Seq(StructField("value", StringType)))
+        val df = spark.createDataFrame(
+          spark.sparkContext.makeRDD(memorySink.allData), schema)
+        checkAnswer(df, data.map(_.toString).toDF("value"))
+      }
+
       /** Check how many batches have executed since the last time this check 
was made */
       var lastBatchId = -1L
       def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
 
       checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be 
in batch 2 (last)
+      checkAllData(1 to 3)
       lastBatchId = memorySink.latestBatchId.get
 
       fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(7)   // 6 and 7 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(6, 7)
       checkNumBatchesSinceLastCheck(2)
+      checkLastBatchData(6, 7)
+      checkAllData(1 to 7)
 
       fileSource.withBatchingLocked {
         createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(12)   // 12 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(12)
       checkNumBatchesSinceLastCheck(3)
+      checkLastBatchData(12)
+      checkAllData(1 to 12)
+
+      q.stop()
+    }
+  }
+
+  test("max files per trigger - incorrect values") {
+    withTempDir { case src =>
+      def testMaxFilePerTriggerValue(value: String): Unit = {
+        val df = spark.readStream.option("maxFilesPerTrigger", 
value).text(src.getCanonicalPath)
+        val e = intercept[IllegalArgumentException] {
+          testStream(df)()
+        }
+        Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
+      }
+
+      testMaxFilePerTriggerValue("not-a-integer")
+      testMaxFilePerTriggerValue("-1")
+      testMaxFilePerTriggerValue("0")
+      testMaxFilePerTriggerValue("10.1")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to