Repository: spark
Updated Branches:
  refs/heads/master f6c3bba22 -> 21b4ba2d6


[SPARK-19599][SS] Clean up HDFSMetadataLog

## What changes were proposed in this pull request?

SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some 
cleanup for HDFSMetadataLog.

This PR includes the following changes:
- ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is 
another issue 
[HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that 
prevents us from removing the workaround codes.
- Remove unnecessary `writer: (T, OutputStream) => Unit` and just call 
`serialize` directly.
- Remove catching FileNotFoundException.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16932 from zsxwing/metadata-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b4ba2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b4ba2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b4ba2d

Branch: refs/heads/master
Commit: 21b4ba2d6f21a9759af879471715c123073bd67a
Parents: f6c3bba
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Feb 15 16:21:43 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Feb 15 16:21:43 2017 -0800

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 39 +++++++++-----------
 .../execution/streaming/StreamExecution.scala   |  4 +-
 2 files changed, 19 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21b4ba2d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index bfdc2cb..3155ce0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
           case ut: UninterruptibleThread =>
             // When using a local file system, "writeBatch" must be called on a
             // [[org.apache.spark.util.UninterruptibleThread]] so that 
interrupts can be disabled
-            // while writing the batch file. This is because there is a 
potential dead-lock in
-            // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the 
thread running
-            // "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our case,
-            // `writeBatch` creates a file using HDFS API and will call 
"Shell.runCommand" to set
-            // the file permission if using the local file system, and can get 
deadlocked if the
-            // stream execution thread is stopped by interrupt. Hence, we make 
sure that
-            // "writeBatch" is called on [[UninterruptibleThread]] which 
allows us to disable
-            // interrupts here. Also see SPARK-14131.
-            ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
+            // while writing the batch file.
+            //
+            // This is because Hadoop "Shell.runCommand" swallows 
InterruptException (HADOOP-14084).
+            // If the user tries to stop a query, and the thread running 
"Shell.runCommand" is
+            // interrupted, then InterruptException will be dropped and the 
query will be still
+            // running. (Note: `writeBatch` creates a file using HDFS APIs and 
will call
+            // "Shell.runCommand" to set the file permission if using the 
local file system)
+            //
+            // Hence, we make sure that "writeBatch" is called on 
[[UninterruptibleThread]] which
+            // allows us to disable interrupts here, in order to propagate the 
interrupt state
+            // correctly. Also see SPARK-19599.
+            ut.runUninterruptibly { writeBatch(batchId, metadata) }
           case _ =>
             throw new IllegalStateException(
               "HDFSMetadataLog.add() on a local file system must be executed 
on " +
@@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
         // For a distributed file system, such as HDFS or S3, if the network 
is broken, write
         // operations may just hang until timeout. We should enable interrupts 
to allow stopping
         // the query fast.
-        writeBatch(batchId, metadata, serialize)
+        writeBatch(batchId, metadata)
       }
       true
     }
   }
 
-  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = 
serialize): Option[Path] = {
-    var nextId = 0
+  def writeTempBatch(metadata: T): Option[Path] = {
     while (true) {
       val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
       try {
         val output = fileManager.create(tempPath)
         try {
-          writer(metadata, output)
+          serialize(metadata, output)
           return Some(tempPath)
         } finally {
           IOUtils.closeQuietly(output)
@@ -164,7 +166,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
           // big problem because it requires the attacker must have the 
permission to write the
           // metadata path. In addition, the old Streaming also have this 
issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
-          nextId += 1
       }
     }
     None
@@ -176,8 +177,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
    * There may be multiple [[HDFSMetadataLog]] using the same metadata path. 
Although it is not a
    * valid behavior, we still need to prevent it from destroying the files.
    */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) 
=> Unit): Unit = {
-    val tempPath = writeTempBatch(metadata, writer).getOrElse(
+  private def writeBatch(batchId: Long, metadata: T): Unit = {
+    val tempPath = writeTempBatch(metadata).getOrElse(
       throw new IllegalStateException(s"Unable to create temp batch file 
$batchId"))
     try {
       // Try to commit the batch
@@ -195,12 +196,6 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
         // So throw an exception to tell the user this is not a valid behavior.
         throw new ConcurrentModificationException(
           s"Multiple HDFSMetadataLog are using $path", e)
-      case e: FileNotFoundException =>
-        // Sometimes, "create" will succeed when multiple writers are calling 
it at the same
-        // time. However, only one writer can call "rename" successfully, 
others will get
-        // FileNotFoundException because the first writer has removed it.
-        throw new ConcurrentModificationException(
-          s"Multiple HDFSMetadataLog are using $path", e)
     } finally {
       fileManager.delete(tempPath)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/21b4ba2d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3149ef0..239d49b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -179,8 +179,8 @@ class StreamExecution(
 
   /**
    * The thread that runs the micro-batches of this stream. Note that this 
thread must be
-   * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential 
deadlocks in using
-   * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
+   * [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing 
`InterruptException` when
+   * using [[HDFSMetadataLog]]. See SPARK-19599 for more details.
    */
   val microBatchThread =
     new StreamExecutionThread(s"stream execution thread for $prettyIdString") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to