This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 35e88705f70 HADOOP-19261. Support force close a DomainSocket for 
server service (#7057)
35e88705f70 is described below

commit 35e88705f70c55dda9f258a475eab3988a5addb0
Author: Sammi Chen <sammic...@apache.org>
AuthorDate: Tue Oct 1 01:06:07 2024 +0800

    HADOOP-19261. Support force close a DomainSocket for server service (#7057)
    
    (cherry picked from commit 6fd4fea748e0516b40b0a79456e3caaf1f6ab547)
---
 .../org/apache/hadoop/net/unix/DomainSocket.java   | 71 ++++++++++++++--------
 .../hadoop/net/unix/TemporarySocketDirectory.java  |  4 +-
 .../apache/hadoop/net/unix/TestDomainSocket.java   | 61 +++++++++++--------
 3 files changed, 84 insertions(+), 52 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
index 73fff0313a5..3edd349efba 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
@@ -339,10 +339,13 @@ public class DomainSocket implements Closeable {
   private static native void shutdown0(int fd) throws IOException;
 
   /**
-   * Close the Socket.
+   * Close the Server Socket without check refCount.
+   * When Server Socket is blocked on accept(), its refCount is 1.
+   * close() call on Server Socket will be stuck in the while loop count check.
+   * @param force         if true, will not check refCount before close socket.
+   * @throws IOException  raised on errors performing I/O.
    */
-  @Override
-  public void close() throws IOException {
+  public void close(boolean force) throws IOException {
     // Set the closed bit on this DomainSocket
     int count;
     try {
@@ -351,41 +354,61 @@ public class DomainSocket implements Closeable {
       // Someone else already closed the DomainSocket.
       return;
     }
-    // Wait for all references to go away
-    boolean didShutdown = false;
+
     boolean interrupted = false;
-    while (count > 0) {
-      if (!didShutdown) {
+    if (force) {
+      try {
+        // Calling shutdown on the socket will interrupt blocking system
+        // calls like accept, write, and read that are going on in a
+        // different thread.
+        shutdown0(fd);
+      } catch (IOException e) {
+        LOG.error("shutdown error: ", e);
+      }
+    } else {
+      // Wait for all references to go away
+      boolean didShutdown = false;
+      while (count > 0) {
+        if (!didShutdown) {
+          try {
+            // Calling shutdown on the socket will interrupt blocking system
+            // calls like accept, write, and read that are going on in a
+            // different thread.
+            shutdown0(fd);
+          } catch (IOException e) {
+            LOG.error("shutdown error: ", e);
+          }
+          didShutdown = true;
+        }
         try {
-          // Calling shutdown on the socket will interrupt blocking system
-          // calls like accept, write, and read that are going on in a
-          // different thread.
-          shutdown0(fd);
-        } catch (IOException e) {
-          LOG.error("shutdown error: ", e);
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          interrupted = true;
         }
-        didShutdown = true;
+        count = refCount.getReferenceCount();
       }
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        interrupted = true;
-      }
-      count = refCount.getReferenceCount();
     }
 
-    // At this point, nobody has a reference to the file descriptor, 
+    // At this point, nobody has a reference to the file descriptor,
     // and nobody will be able to get one in the future either.
     // We now call close(2) on the file descriptor.
-    // After this point, the file descriptor number will be reused by 
-    // something else.  Although this DomainSocket object continues to hold 
-    // the old file descriptor number (it's a final field), we never use it 
+    // After this point, the file descriptor number will be reused by
+    // something else.  Although this DomainSocket object continues to hold
+    // the old file descriptor number (it's a final field), we never use it
     // again because this DomainSocket is closed.
     close0(fd);
     if (interrupted) {
       Thread.currentThread().interrupt();
     }
   }
+
+  /**
+   * Close the Socket.
+   */
+  @Override
+  public void close() throws IOException {
+    close(false);
+  }
   
   /**
    * Call shutdown(SHUT_RDWR) on the UNIX domain socket.
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
index c00b4b259aa..40399f07a29 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.net.unix;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Random;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
@@ -35,8 +34,7 @@ public class TemporarySocketDirectory implements Closeable {
 
   public TemporarySocketDirectory() {
     String tmp = System.getProperty("java.io.tmpdir", "/tmp");
-    dir = new File(tmp, "socks." + (System.currentTimeMillis() +
-        "." + (new Random().nextInt())));
+    dir = new File(tmp, "socks." + System.nanoTime());
     dir.mkdirs();
     FileUtil.setWritable(dir, true);
   }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
index 61cbd85f8d6..952f2b35e43 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
@@ -130,7 +130,7 @@ public class TestDomainSocket {
     DomainSocket conn = DomainSocket.connect(serv.getPath());
     Thread.sleep(50);
     conn.close();
-    serv.close();
+    serv.close(true);
     future.get(2, TimeUnit.MINUTES);
   }
 
@@ -161,7 +161,7 @@ public class TestDomainSocket {
     };
     Future<Void> future = exeServ.submit(callable);
     Thread.sleep(500);
-    serv.close();
+    serv.close(true);
     future.get(2, TimeUnit.MINUTES);
   }
 
@@ -240,7 +240,7 @@ public class TestDomainSocket {
     Future<Void> clientFuture = exeServ.submit(clientCallable);
     Thread.sleep(500);
     clientConn.close();
-    serv.close();
+    serv.close(true);
     clientFuture.get(2, TimeUnit.MINUTES);
     serverFuture.get(2, TimeUnit.MINUTES);
   }
@@ -281,28 +281,39 @@ public class TestDomainSocket {
     final String TEST_PATH = new File(sockDir.getDir(),
         "test_sock_server_options").getAbsolutePath();
     DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
-    try {
-      // Let's set a new receive buffer size
-      int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-      int newBufSize = bufSize / 2;
-      serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
-      int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-      Assert.assertEquals(newBufSize, nextBufSize);
-      // Let's set a server timeout
-      int newTimeout = 1000;
-      serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
-      int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
-      Assert.assertEquals(newTimeout, nextTimeout);
-      try {
-        serv.accept();
-        Assert.fail("expected the accept() to time out and fail");
-      } catch (SocketTimeoutException e) {
-        GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+    // Let's set a new receive buffer size
+    int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+    int newBufSize = bufSize / 2;
+    serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
+    int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+    Assert.assertEquals(newBufSize, nextBufSize);
+    // Let's set a server timeout
+    int newTimeout = 1000;
+    serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
+    int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
+    Assert.assertEquals(newTimeout, nextTimeout);
+
+    ExecutorService exeServ = Executors.newSingleThreadExecutor();
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call() {
+        try {
+          serv.accept();
+          Assert.fail("expected the accept() to time out and fail");
+        } catch (SocketTimeoutException e) {
+          GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+        } catch (AsynchronousCloseException e) {
+          return null;
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        }
+        return null;
       }
-    } finally {
-      serv.close();
-      Assert.assertFalse(serv.isOpen());
-    }
+    };
+    Future<Void> future = exeServ.submit(callable);
+    Thread.sleep(500);
+    serv.close(true);
+    future.get();
+    Assert.assertFalse(serv.isOpen());
   }
   
   /**
@@ -656,7 +667,7 @@ public class TestDomainSocket {
     }
     serverThread.join(120000);
     clientThread.join(120000);
-    serv.close();
+    serv.close(true);
     for (PassedFile pf : passedFiles) {
       pf.cleanup();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to