Repository: spark
Updated Branches:
  refs/heads/master fb54a564d -> 6edfff055


[SPARK-21596][SS] Ensure places calling HDFSMetadataLog.get check the return 
value

## What changes were proposed in this pull request?

When I was investigating a flaky test, I realized that many places don't check 
the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. When a 
batch is supposed to be there, the caller just ignores None rather than 
throwing an error. If some bug causes a query doesn't generate a batch metadata 
file, this behavior will hide it and allow the query continuing to run and 
finally delete metadata logs and make it hard to debug.

This PR ensures that places calling HDFSMetadataLog.get always check the return 
value.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #18799 from zsxwing/SPARK-21596.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6edfff05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6edfff05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6edfff05

Branch: refs/heads/master
Commit: 6edfff055caea81dc3a98a6b4081313a0c0b0729
Parents: fb54a56
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Aug 8 20:20:26 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Aug 8 20:20:26 2017 -0700

----------------------------------------------------------------------
 .../streaming/CompactibleFileStreamLog.scala    | 16 +++++-
 .../streaming/FileStreamSourceLog.scala         |  5 +-
 .../execution/streaming/HDFSMetadataLog.scala   | 57 ++++++++++++++++++--
 .../execution/streaming/StreamExecution.scala   | 17 ++++--
 .../streaming/HDFSMetadataLogSuite.scala        | 17 ++++++
 .../sql/streaming/FileStreamSourceSuite.scala   |  1 +
 6 files changed, 102 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index e37033b..77bc0ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -169,7 +169,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
    */
   private def compact(batchId: Long, logs: Array[T]): Boolean = {
     val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
-    val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten 
++ logs
+    val allLogs = validBatches.map { id =>
+      super.get(id).getOrElse {
+        throw new IllegalStateException(
+          s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId 
" +
+            s"(compactInterval: $compactInterval)")
+      }
+    }.flatten ++ logs
     // Return false as there is another writer.
     super.add(batchId, compactLogs(allLogs).toArray)
   }
@@ -186,7 +192,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
       if (latestId >= 0) {
         try {
           val logs =
-            getAllValidBatches(latestId, compactInterval).flatMap(id => 
super.get(id)).flatten
+            getAllValidBatches(latestId, compactInterval).map { id =>
+              super.get(id).getOrElse {
+                throw new IllegalStateException(
+                  s"${batchIdToPath(id)} doesn't exist " +
+                    s"(latestId: $latestId, compactInterval: 
$compactInterval)")
+              }
+            }.flatten
           return compactLogs(logs).toArray
         } catch {
           case e: IOException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 33e6a1d..8628471 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -115,7 +115,10 @@ class FileStreamSourceLog(
       Map.empty[Long, Option[Array[FileEntry]]]
     }
 
-    (existedBatches ++ retrievedBatches).map(i => i._1 -> 
i._2.get).toArray.sortBy(_._1)
+    val batches =
+      (existedBatches ++ retrievedBatches).map(i => i._1 -> 
i._2.get).toArray.sortBy(_._1)
+    HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
+    batches
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 46bfc29..5f8973f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
           serialize(metadata, output)
           return Some(tempPath)
         } finally {
-          IOUtils.closeQuietly(output)
+          output.close()
         }
       } catch {
         case e: FileAlreadyExistsException =>
@@ -211,13 +211,17 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   }
 
   override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, 
T)] = {
+    assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get)
     val files = fileManager.list(metadataPath, batchFilesFilter)
     val batchIds = files
       .map(f => pathToBatchId(f.getPath))
       .filter { batchId =>
         (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId 
>= startId.get)
-    }
-    batchIds.sorted.map(batchId => (batchId, 
get(batchId))).filter(_._2.isDefined).map {
+    }.sorted
+
+    verifyBatchIds(batchIds, startId, endId)
+
+    batchIds.map(batchId => (batchId, 
get(batchId))).filter(_._2.isDefined).map {
       case (batchId, metadataOption) =>
         (batchId, metadataOption.get)
     }
@@ -437,4 +441,51 @@ object HDFSMetadataLog {
       }
     }
   }
+
+  /**
+   * Verify if batchIds are continuous and between `startId` and `endId`.
+   *
+   * @param batchIds the sorted ids to verify.
+   * @param startId the start id. If it's set, batchIds should start with this 
id.
+   * @param endId the start id. If it's set, batchIds should end with this id.
+   */
+  def verifyBatchIds(batchIds: Seq[Long], startId: Option[Long], endId: 
Option[Long]): Unit = {
+    // Verify that we can get all batches between `startId` and `endId`.
+    if (startId.isDefined || endId.isDefined) {
+      if (batchIds.isEmpty) {
+        throw new IllegalStateException(s"batch ${startId.orElse(endId).get} 
doesn't exist")
+      }
+      if (startId.isDefined) {
+        val minBatchId = batchIds.head
+        assert(minBatchId >= startId.get)
+        if (minBatchId != startId.get) {
+          val missingBatchIds = startId.get to minBatchId
+          throw new IllegalStateException(
+            s"batches (${missingBatchIds.mkString(", ")}) don't exist " +
+              s"(startId: $startId, endId: $endId)")
+        }
+      }
+
+      if (endId.isDefined) {
+        val maxBatchId = batchIds.last
+        assert(maxBatchId <= endId.get)
+        if (maxBatchId != endId.get) {
+          val missingBatchIds = maxBatchId to endId.get
+          throw new IllegalStateException(
+            s"batches (${missingBatchIds.mkString(", ")}) don't  exist " +
+              s"(startId: $startId, endId: $endId)")
+        }
+      }
+    }
+
+    if (batchIds.nonEmpty) {
+      val minBatchId = batchIds.head
+      val maxBatchId = batchIds.last
+      val missingBatchIds = (minBatchId to maxBatchId).toSet -- batchIds
+      if (missingBatchIds.nonEmpty) {
+        throw new IllegalStateException(s"batches 
(${missingBatchIds.mkString(", ")}) " +
+          s"don't exist (startId: $startId, endId: $endId)")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 1528e7f..9bc114f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -438,7 +438,10 @@ class StreamExecution(
         availableOffsets = nextOffsets.toStreamProgress(sources)
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
-        offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
+        if (latestBatchId != 0) {
+          val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse 
{
+            throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
+          }
           committedOffsets = secondLatestBatchId.toStreamProgress(sources)
         }
 
@@ -565,10 +568,14 @@ class StreamExecution(
 
         // Now that we've updated the scheduler's persistent checkpoint, it is 
safe for the
         // sources to discard data from the previous batch.
-        val prevBatchOff = offsetLog.get(currentBatchId - 1)
-        if (prevBatchOff.isDefined) {
-          prevBatchOff.get.toStreamProgress(sources).foreach {
-            case (src, off) => src.commit(off)
+        if (currentBatchId != 0) {
+          val prevBatchOff = offsetLog.get(currentBatchId - 1)
+          if (prevBatchOff.isDefined) {
+            prevBatchOff.get.toStreamProgress(sources).foreach {
+              case (src, off) => src.commit(off)
+            }
+          } else {
+            throw new IllegalStateException(s"batch $currentBatchId doesn't 
exist")
           }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 7689bc0..48e70e4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -259,6 +259,23 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
       fm.rename(path2, path3)
     }
   }
+
+  test("verifyBatchIds") {
+    import HDFSMetadataLog.verifyBatchIds
+    verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L))
+    verifyBatchIds(Seq(1L), Some(1L), Some(1L))
+    verifyBatchIds(Seq(1L, 2L, 3L), None, Some(3L))
+    verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), None)
+    verifyBatchIds(Seq(1L, 2L, 3L), None, None)
+
+    intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), None))
+    intercept[IllegalStateException](verifyBatchIds(Seq(), None, Some(1L)))
+    intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), Some(1L)))
+    intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), 
None))
+    intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, 
Some(5L)))
+    intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), 
Some(5L)))
+    intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), 
Some(5L)))
+  }
 }
 
 /** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to 
FileSystem API */

http://git-wip-us.apache.org/repos/asf/spark/blob/6edfff05/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2108b11..e2ec690 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     val metadataLog =
       new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, 
dir.getAbsolutePath)
       assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 
0))))
+      assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 
0))))
 
       val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", 
StructType(Nil), Nil,
         dir.getAbsolutePath, Map.empty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to