Repository: spark
Updated Branches:
  refs/heads/master 8198ea501 -> 592e3a42c


[SPARK-25218][CORE] Fix potential resource leaks in TransportServer and 
SocketAuthHelper

## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all 
types of errors.

## How was this patch tested?

Jenkins

Closes #22210 from zsxwing/SPARK-25218.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/592e3a42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/592e3a42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/592e3a42

Branch: refs/heads/master
Commit: 592e3a42c20b72edd6e8b9dd07da367596f43da5
Parents: 8198ea5
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Aug 28 08:36:06 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Aug 28 08:36:06 2018 -0700

----------------------------------------------------------------------
 .../buffer/FileSegmentManagedBuffer.java        | 32 ++++++-------
 .../spark/network/server/TransportServer.java   |  9 ++--
 .../spark/security/SocketAuthHelper.scala       | 50 +++++++++++++-------
 3 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 8b8f989..45fee54 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -77,16 +77,16 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
         return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
       }
     } catch (IOException e) {
+      String errorMessage = "Error in reading " + this;
       try {
         if (channel != null) {
           long size = channel.size();
-          throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
-            e);
+          errorMessage = "Error in reading " + this + " (actual file length " 
+ size + ")";
         }
       } catch (IOException ignored) {
         // ignore
       }
-      throw new IOException("Error in opening " + this, e);
+      throw new IOException(errorMessage, e);
     } finally {
       JavaUtils.closeQuietly(channel);
     }
@@ -95,26 +95,24 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
   @Override
   public InputStream createInputStream() throws IOException {
     FileInputStream is = null;
+    boolean shouldClose = true;
     try {
       is = new FileInputStream(file);
       ByteStreams.skipFully(is, offset);
-      return new LimitedInputStream(is, length);
+      InputStream r = new LimitedInputStream(is, length);
+      shouldClose = false;
+      return r;
     } catch (IOException e) {
-      try {
-        if (is != null) {
-          long size = file.length();
-          throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
-              e);
-        }
-      } catch (IOException ignored) {
-        // ignore
-      } finally {
+      String errorMessage = "Error in reading " + this;
+      if (is != null) {
+        long size = file.length();
+        errorMessage = "Error in reading " + this + " (actual file length " + 
size + ")";
+      }
+      throw new IOException(errorMessage, e);
+    } finally {
+      if (shouldClose) {
         JavaUtils.closeQuietly(is);
       }
-      throw new IOException("Error in opening " + this, e);
-    } catch (RuntimeException e) {
-      JavaUtils.closeQuietly(is);
-      throw e;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d95ed22..9c85ab2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -70,11 +70,14 @@ public class TransportServer implements Closeable {
     this.appRpcHandler = appRpcHandler;
     this.bootstraps = 
Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
 
+    boolean shouldClose = true;
     try {
       init(hostToBind, portToBind);
-    } catch (RuntimeException e) {
-      JavaUtils.closeQuietly(this);
-      throw e;
+      shouldClose = false;
+    } finally {
+      if (shouldClose) {
+        JavaUtils.closeQuietly(this);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala 
b/core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala
index d15e793..ea38ccb 100644
--- a/core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala
+++ b/core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala
@@ -42,43 +42,59 @@ private[spark] class SocketAuthHelper(conf: SparkConf) {
    * Read the auth secret from the socket and compare to the expected value. 
Write the reply back
    * to the socket.
    *
-   * If authentication fails, this method will close the socket.
+   * If authentication fails or error is thrown, this method will close the 
socket.
    *
    * @param s The client socket.
    * @throws IllegalArgumentException If authentication fails.
    */
   def authClient(s: Socket): Unit = {
-    // Set the socket timeout while checking the auth secret. Reset it before 
returning.
-    val currentTimeout = s.getSoTimeout()
+    var shouldClose = true
     try {
-      s.setSoTimeout(10000)
-      val clientSecret = readUtf8(s)
-      if (secret == clientSecret) {
-        writeUtf8("ok", s)
-      } else {
-        writeUtf8("err", s)
-        JavaUtils.closeQuietly(s)
+      // Set the socket timeout while checking the auth secret. Reset it 
before returning.
+      val currentTimeout = s.getSoTimeout()
+      try {
+        s.setSoTimeout(10000)
+        val clientSecret = readUtf8(s)
+        if (secret == clientSecret) {
+          writeUtf8("ok", s)
+          shouldClose = false
+        } else {
+          writeUtf8("err", s)
+          throw new IllegalArgumentException("Authentication failed.")
+        }
+      } finally {
+        s.setSoTimeout(currentTimeout)
       }
     } finally {
-      s.setSoTimeout(currentTimeout)
+      if (shouldClose) {
+        JavaUtils.closeQuietly(s)
+      }
     }
   }
 
   /**
    * Authenticate with a server by writing the auth secret and checking the 
server's reply.
    *
-   * If authentication fails, this method will close the socket.
+   * If authentication fails or error is thrown, this method will close the 
socket.
    *
    * @param s The socket connected to the server.
    * @throws IllegalArgumentException If authentication fails.
    */
   def authToServer(s: Socket): Unit = {
-    writeUtf8(secret, s)
+    var shouldClose = true
+    try {
+      writeUtf8(secret, s)
 
-    val reply = readUtf8(s)
-    if (reply != "ok") {
-      JavaUtils.closeQuietly(s)
-      throw new IllegalArgumentException("Authentication failed.")
+      val reply = readUtf8(s)
+      if (reply != "ok") {
+        throw new IllegalArgumentException("Authentication failed.")
+      } else {
+        shouldClose = false
+      }
+    } finally {
+      if (shouldClose) {
+        JavaUtils.closeQuietly(s)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to