Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d3aaed219 -> e8ca1aea5


[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

Revise HDFSMetadataLog API such that metadata object serialization and final 
batch file write are separated. This will allow serialization checks without 
worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality 
i.e., creation of batch files.

Author: Tyson Condie <[email protected]>

Closes #15924 from tcondie/SPARK-18498.

Signed-off-by: Michael Armbrust <[email protected]>
(cherry picked from commit f643fe47f4889faf68da3da8d7850ee48df7c22f)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8ca1aea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8ca1aea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8ca1aea

Branch: refs/heads/branch-2.1
Commit: e8ca1aea56956755e6335c0b7d2cbaa43e1f1e18
Parents: d3aaed2
Author: Tyson Condie <[email protected]>
Authored: Tue Nov 29 12:36:41 2016 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Tue Nov 29 12:38:04 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 100 ++++++++++++-------
 1 file changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e8ca1aea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f..1b41352 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) 
=> Unit): Unit = {
-    // Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = 
serialize): Option[Path] = {
     var nextId = 0
     while (true) {
       val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
         val output = fileManager.create(tempPath)
         try {
           writer(metadata, output)
+          return Some(tempPath)
         } finally {
           IOUtils.closeQuietly(output)
         }
-        try {
-          // Try to commit the batch
-          // It will fail if there is an existing file (someone has committed 
the batch)
-          logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-          fileManager.rename(tempPath, batchIdToPath(batchId))
-
-          // SPARK-17475: HDFSMetadataLog should not leak CRC files
-          // If the underlying filesystem didn't rename the CRC file, delete 
it.
-          val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
-          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
-          return
-        } catch {
-          case e: IOException if isFileAlreadyExistsException(e) =>
-            // If "rename" fails, it means some other "HDFSMetadataLog" has 
committed the batch.
-            // So throw an exception to tell the user this is not a valid 
behavior.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-          case e: FileNotFoundException =>
-            // Sometimes, "create" will succeed when multiple writers are 
calling it at the same
-            // time. However, only one writer can call "rename" successfully, 
others will get
-            // FileNotFoundException because the first writer has removed it.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-        }
       } catch {
         case e: IOException if isFileAlreadyExistsException(e) =>
           // Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
           // metadata path. In addition, the old Streaming also have this 
issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
           nextId += 1
-      } finally {
-        fileManager.delete(tempPath)
       }
     }
+    None
+  }
+
+  /**
+   * Write a batch to a temp file then rename it to the batch file.
+   *
+   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
+   * valid behavior, we still need to prevent it from destroying the files.
+   */
+  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) 
=> Unit): Unit = {
+    val tempPath = writeTempBatch(metadata, writer).getOrElse(
+      throw new IllegalStateException(s"Unable to create temp batch file 
$batchId"))
+    try {
+      // Try to commit the batch
+      // It will fail if there is an existing file (someone has committed the 
batch)
+      logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+      fileManager.rename(tempPath, batchIdToPath(batchId))
+
+      // SPARK-17475: HDFSMetadataLog should not leak CRC files
+      // If the underlying filesystem didn't rename the CRC file, delete it.
+      val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
+      if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+    } catch {
+      case e: IOException if isFileAlreadyExistsException(e) =>
+        // If "rename" fails, it means some other "HDFSMetadataLog" has 
committed the batch.
+        // So throw an exception to tell the user this is not a valid behavior.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+      case e: FileNotFoundException =>
+        // Sometimes, "create" will succeed when multiple writers are calling 
it at the same
+        // time. However, only one writer can call "rename" successfully, 
others will get
+        // FileNotFoundException because the first writer has removed it.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+    } finally {
+      fileManager.delete(tempPath)
+    }
   }
 
   private def isFileAlreadyExistsException(e: IOException): Boolean = {
@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
       (e.getMessage != null && e.getMessage.startsWith("File already exists: 
"))
   }
 
+  /**
+   * @return the deserialized metadata in a batch file, or None if file not 
exist.
+   * @throws IllegalArgumentException when path does not point to a batch file.
+   */
+  def get(batchFile: Path): Option[T] = {
+    if (fileManager.exists(batchFile)) {
+      if (isBatchFile(batchFile)) {
+        get(pathToBatchId(batchFile))
+      } else {
+        throw new IllegalArgumentException(s"File ${batchFile} is not a batch 
file!")
+      }
+    } else {
+      None
+    }
+  }
+
   override def get(batchId: Long): Option[T] = {
     val batchMetadataFile = batchIdToPath(batchId)
     if (fileManager.exists(batchMetadataFile)) {
@@ -251,6 +272,17 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   }
 
   /**
+   * Get an array of [FileStatus] referencing batch files.
+   * The array is sorted by most recent batch file first to
+   * oldest batch file.
+   */
+  def getOrderedBatchFiles(): Array[FileStatus] = {
+    fileManager.list(metadataPath, batchFilesFilter)
+      .sortBy(f => pathToBatchId(f.getPath))
+      .reverse
+  }
+
+  /**
    * Removes all the log entry earlier than thresholdBatchId (exclusive).
    */
   override def purge(thresholdBatchId: Long): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to