This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 1843c16  [SPARK-26629][SS] Fixed error with multiple file stream in a 
query + restart on a batch that has no data for one file stream
1843c16 is described below

commit 1843c16fda09a3e9373e8f7b3ff5f73455c50442
Author: Tathagata Das <tathagata.das1...@gmail.com>
AuthorDate: Wed Jan 16 09:42:14 2019 -0800

    [SPARK-26629][SS] Fixed error with multiple file stream in a query + 
restart on a batch that has no data for one file stream
    
    ## What changes were proposed in this pull request?
    When a streaming query has multiple file streams, and there is a batch 
where one of the file streams dont have data in that batch, then if the query 
has to restart from that, it will throw the following error.
    ```
    java.lang.IllegalStateException: batch 1 doesn't exist
        at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
        at 
org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
        at 
org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
    ```
    
    Existing `HDFSMetadata.verifyBatchIds` threw error whenever the `batchIds` 
list was empty. In the context of `FileStreamSource.getBatch` (where verify is 
called) and `FileStreamSourceLog` (subclass of `HDFSMetadata`), this is usually 
okay because, in a streaming query with one file stream, the `batchIds` can 
never be empty:
    - A batch is planned only when the `FileStreamSourceLog` has seen new 
offset (that is, there are new data files).
    - So `FileStreamSource.getBatch` will be called on X to Y where X will 
always be > Y. This calls internally`HDFSMetadata.verifyBatchIds (X+1, Y)` with 
X+1-Y ids.
    
    For example.,`FileStreamSource.getBatch(4, 5)` will call `verify(batchIds = 
Seq(5), start = 5, end = 5)`. However, the invariant of X > Y is not true when 
there are two file stream sources, as a batch may be planned even when only one 
of the file streams has data. So one of the file stream may not have data, 
which can call `FileStreamSource.getBatch(X, X)` -> `verify(batchIds = 
Seq.empty, start = X+1, end = X)` -> failure.
    
    Note that `FileStreamSource.getBatch(X, X)` gets called **only when 
restarting a query in a batch where a file source did not have data**. This is 
because in normal planning of batches, `MicroBatchExecution` avoids calling 
`FileStreamSource.getBatch(X, X)` when offset X has not changed. However, when 
restarting a stream at such a batch, 
`MicroBatchExecution.populateStartOffsets()` calls 
`FileStreamSource.getBatch(X, X)` (DataSource V1 hack to initialize the source 
with last known offs [...]
    
    The minimum solution here is to skip verification when 
`FileStreamSource.getBatch(X, X)`.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #23557 from tdas/SPARK-26629.
    
    Authored-by: Tathagata Das <tathagata.das1...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
    (cherry picked from commit 06d5b173b687c23aa53e293ed6e12ec746393876)
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../execution/streaming/FileStreamSourceLog.scala  |  4 +-
 .../sql/execution/streaming/HDFSMetadataLog.scala  |  3 +-
 .../execution/streaming/HDFSMetadataLogSuite.scala |  6 ++
 .../sql/streaming/FileStreamSourceSuite.scala      | 75 ++++++++++++++++++++--
 4 files changed, 80 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 8628471..7b2ea96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -117,7 +117,9 @@ class FileStreamSourceLog(
 
     val batches =
       (existedBatches ++ retrievedBatches).map(i => i._1 -> 
i._2.get).toArray.sortBy(_._1)
-    HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
+    if (startBatchId <= endBatchId) {
+      HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
+    }
     batches
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index bd0a461..62d524f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -262,7 +262,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
 object HDFSMetadataLog {
 
   /**
-   * Verify if batchIds are continuous and between `startId` and `endId`.
+   * Verify if batchIds are continuous and between `startId` and `endId` (both 
inclusive and
+   * startId assumed to be <= endId).
    *
    * @param batchIds the sorted ids to verify.
    * @param startId the start id. If it's set, batchIds should start with this 
id.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9268306..0e36e7f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -178,5 +178,11 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, 
Some(5L)))
     intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), 
Some(5L)))
     intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), 
Some(5L)))
+
+    // Related to SPARK-26629, this capatures the behavior for verifyBatchIds 
when startId > endId
+    intercept[IllegalStateException](verifyBatchIds(Seq(), Some(2L), Some(1L)))
+    intercept[AssertionError](verifyBatchIds(Seq(2), Some(2L), Some(1L)))
+    intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L)))
+    intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L)))
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index d4bd9c7..fb0b365 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -48,21 +48,33 @@ abstract class FileStreamSourceTest
    * `FileStreamSource` actually being used in the execution.
    */
   abstract class AddFileData extends AddData {
+    private val _qualifiedBasePath = PrivateMethod[Path]('qualifiedBasePath)
+
+    private def isSamePath(fileSource: FileStreamSource, srcPath: File): 
Boolean = {
+      val path = (fileSource invokePrivate 
_qualifiedBasePath()).toString.stripPrefix("file:")
+      path == srcPath.getCanonicalPath
+    }
+
     override def addData(query: Option[StreamExecution]): (Source, Offset) = {
       require(
         query.nonEmpty,
         "Cannot add data when there is no query for finding the active file 
stream source")
 
       val sources = getSourcesFromStreamingQuery(query.get)
-      if (sources.isEmpty) {
+      val source = if (sources.isEmpty) {
         throw new Exception(
           "Could not find file source in the StreamExecution logical plan to 
add data to")
-      } else if (sources.size > 1) {
-        throw new Exception(
-          "Could not select the file source in the StreamExecution logical 
plan as there" +
-            "are multiple file sources:\n\t" + sources.mkString("\n\t"))
+      } else if (sources.size == 1) {
+        sources.head
+      } else {
+        val matchedSources = sources.filter(isSamePath(_, src))
+        if (matchedSources.size != 1) {
+          throw new Exception(
+            "Could not select the file source in StreamExecution as there are 
multiple" +
+              s" file sources and none / more than one matches $src:\n" + 
sources.mkString("\n"))
+        }
+        matchedSources.head
       }
-      val source = sources.head
       val newOffset = source.withBatchingLocked {
         addData(source)
         new FileStreamSourceOffset(source.currentLogOffset + 1)
@@ -71,6 +83,9 @@ abstract class FileStreamSourceTest
       (source, newOffset)
     }
 
+    /** Source directory to add file data to */
+    protected def src: File
+
     protected def addData(source: FileStreamSource): Unit
   }
 
@@ -1494,6 +1509,54 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
       newSource.getBatch(None, FileStreamSourceOffset(1))
     }
   }
+
+  test("SPARK-26629: multiple file sources work with restarts when a source 
does not have data") {
+    withTempDirs { case (dir, tmp) =>
+      val sourceDir1 = new File(dir, "source1")
+      val sourceDir2 = new File(dir, "source2")
+      sourceDir1.mkdirs()
+      sourceDir2.mkdirs()
+
+      val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}")
+      val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}")
+      val unioned = source1.union(source2)
+
+      def addMultiTextFileData(
+          source1Content: String,
+          source2Content: String): StreamAction = {
+        val actions = Seq(
+          AddTextFileData(source1Content, sourceDir1, tmp),
+          AddTextFileData(source2Content, sourceDir2, tmp)
+        ).filter(_.content != null)  // don't write to a source dir if no 
content specified
+        StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | 
", " ]"))
+      }
+
+      testStream(unioned)(
+        StartStream(),
+        addMultiTextFileData(source1Content = "source1_0", source2Content = 
"source2_0"),
+        CheckNewAnswer("source1_0", "source2_0"),
+        StopStream,
+
+        StartStream(),
+        addMultiTextFileData(source1Content = "source1_1", source2Content = 
null),
+        CheckNewAnswer("source1_1"),
+        StopStream,
+
+        // Restart after a batch with one file source having no new data.
+        // This restart is needed to hit the issue in SPARK-26629.
+
+        StartStream(),
+        addMultiTextFileData(source1Content = null, source2Content = 
"source2_2"),
+        CheckNewAnswer("source2_2"),
+        StopStream,
+
+        StartStream(),
+        addMultiTextFileData(source1Content = "source1_3", source2Content = 
"source2_3"),
+        CheckNewAnswer("source1_3", "source2_3"),
+        StopStream
+      )
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to