This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be374fa  HDFS-13639. SlotReleaser is not fast enough (#1885)
be374fa is described below

commit be374faf429d28561dd9c582f5c55451213d89a4
Author: leosunli <lisheng.su...@gmail.com>
AuthorDate: Fri May 22 04:21:17 2020 +0800

    HDFS-13639. SlotReleaser is not fast enough (#1885)
---
 .../hdfs/shortcircuit/DfsClientShmManager.java     | 10 +++
 .../hdfs/shortcircuit/ShortCircuitCache.java       | 61 +++++++++++---
 .../hdfs/server/datanode/ShortCircuitRegistry.java |  5 ++
 .../hdfs/shortcircuit/TestShortCircuitCache.java   | 92 ++++++++++++++++++++++
 4 files changed, 155 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
index 6b3d8e0..dfce230 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -497,4 +497,14 @@ public class DfsClientShmManager implements Closeable {
   public DomainSocketWatcher getDomainSocketWatcher() {
     return domainSocketWatcher;
   }
+
+  @VisibleForTesting
+  public int getShmNum() {
+    int segments = 0;
+    for (EndpointShmManager endpointShmManager : datanodes.values()) {
+      segments +=
+          endpointShmManager.notFull.size() + endpointShmManager.full.size();
+    }
+    return segments;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index d4d898c..a9970c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.SocketException;
 import java.nio.MappedByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -181,25 +182,52 @@ public class ShortCircuitCache implements Closeable {
 
     @Override
     public void run() {
+      if (slot == null) {
+        return;
+      }
       LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
       final DfsClientShm shm = (DfsClientShm)slot.getShm();
       final DomainSocket shmSock = shm.getPeer().getDomainSocket();
       final String path = shmSock.getPath();
+      DataOutputStream out = null;
       boolean success = false;
-      try (DomainSocket sock = DomainSocket.connect(path);
-           DataOutputStream out = new DataOutputStream(
-               new BufferedOutputStream(sock.getOutputStream()))) {
-        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
-        DataInputStream in = new DataInputStream(sock.getInputStream());
-        ReleaseShortCircuitAccessResponseProto resp =
-            ReleaseShortCircuitAccessResponseProto.parseFrom(
-                PBHelperClient.vintPrefixed(in));
-        if (resp.getStatus() != Status.SUCCESS) {
-          String error = resp.hasError() ? resp.getError() : "(unknown)";
-          throw new IOException(resp.getStatus().toString() + ": " + error);
+      int retries = 2;
+      try {
+        while (retries > 0) {
+          try {
+            if (domainSocket == null || !domainSocket.isOpen()) {
+              // we are running in single thread mode, no protection needed for
+              // domainSocket
+              domainSocket = DomainSocket.connect(path);
+            }
+
+            out = new DataOutputStream(
+                new BufferedOutputStream(domainSocket.getOutputStream()));
+            new Sender(out).releaseShortCircuitFds(slot.getSlotId());
+            DataInputStream in =
+                new DataInputStream(domainSocket.getInputStream());
+            ReleaseShortCircuitAccessResponseProto resp =
+                ReleaseShortCircuitAccessResponseProto
+                    .parseFrom(PBHelperClient.vintPrefixed(in));
+            if (resp.getStatus() != Status.SUCCESS) {
+              String error = resp.hasError() ? resp.getError() : "(unknown)";
+              throw new IOException(resp.getStatus().toString() + ": " + 
error);
+            }
+
+            LOG.trace("{}: released {}", this, slot);
+            success = true;
+            break;
+
+          } catch (SocketException se) {
+            // the domain socket on datanode may be timed out, we retry once
+            retries--;
+            domainSocket.close();
+            domainSocket = null;
+            if (retries == 0) {
+              throw new SocketException("Create domain socket failed");
+            }
+          }
         }
-        LOG.trace("{}: released {}", this, slot);
-        success = true;
       } catch (IOException e) {
         LOG.warn(ShortCircuitCache.this + ": failed to release "
             + "short-circuit shared memory slot " + slot + " by sending "
@@ -211,6 +239,8 @@ public class ShortCircuitCache implements Closeable {
           shmManager.freeSlot(slot);
         } else {
           shm.getEndpointShmManager().shutdown(shm);
+          IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
+          domainSocket = null;
         }
       }
     }
@@ -324,6 +354,8 @@ public class ShortCircuitCache implements Closeable {
    */
   private final DfsClientShmManager shmManager;
 
+  private DomainSocket domainSocket = null;
+
   public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
     return new ShortCircuitCache(
         conf.getShortCircuitStreamsCacheSize(),
@@ -997,6 +1029,9 @@ public class ShortCircuitCache implements Closeable {
    * @param slot           The slot to release.
    */
   public void scheduleSlotReleaser(Slot slot) {
+    if (slot == null) {
+      return;
+    }
     Preconditions.checkState(shmManager != null);
     releaserExecutor.execute(new SlotReleaser(slot));
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
index 3df83cf..4b35109 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
@@ -404,4 +404,9 @@ public class ShortCircuitRegistry {
   public synchronized boolean visit(Visitor visitor) {
     return visitor.accept(segments, slots);
   }
+
+  @VisibleForTesting
+  public int getShmNum() {
+    return segments.size();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 53cac2a..e637f4c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.hamcrest.CoreMatchers.equalTo;
 
 import java.io.DataOutputStream;
@@ -28,6 +29,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -910,4 +912,94 @@ public class TestShortCircuitCache {
       }
     }
   }
+
+  @Test(timeout = 60000)
+  public void testDomainSocketClosedByDN() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final ShortCircuitCache cache =
+          fs.getClient().getClientContext().getShortCircuitCache();
+      DomainPeer peer = getDomainPeerToDn(conf);
+      MutableBoolean usedPeer = new MutableBoolean(false);
+      ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+      final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+      // Allocating the first shm slot requires using up a peer.
+      Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot1.getSlotId(), false);
+
+      Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot2.getSlotId(), false);
+
+      cache.scheduleSlotReleaser(slot1);
+
+      Thread.sleep(2000);
+      cache.scheduleSlotReleaser(slot2);
+      Thread.sleep(2000);
+      Assert.assertEquals(0,
+          cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testDNRestart() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final ShortCircuitCache cache =
+          fs.getClient().getClientContext().getShortCircuitCache();
+      DomainPeer peer = getDomainPeerToDn(conf);
+      MutableBoolean usedPeer = new MutableBoolean(false);
+      ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+      final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+      // Allocating the first shm slot requires using up a peer.
+      Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot1.getSlotId(), false);
+
+      // restart the datanode to invalidate the cache
+      cluster.restartDataNode(0);
+      Thread.sleep(1000);
+      // after the restart, new allocation and release should not be affect
+      cache.scheduleSlotReleaser(slot1);
+
+      Slot slot2 = null;
+      try {
+        slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+            "testReleaseSlotReuseDomainSocket_client");
+      } catch (ClosedChannelException ce) {
+
+      }
+      cache.scheduleSlotReleaser(slot2);
+      Thread.sleep(2000);
+      Assert.assertEquals(0,
+          cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to