jojochuang commented on a change in pull request #1885:
URL: https://github.com/apache/hadoop/pull/1885#discussion_r427541701



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
##########
@@ -181,25 +182,49 @@ public long getRateInMs() {
 
     @Override
     public void run() {
+      if (slot == null) {
+        return;
+      }
       LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
       final DfsClientShm shm = (DfsClientShm)slot.getShm();
       final DomainSocket shmSock = shm.getPeer().getDomainSocket();
       final String path = shmSock.getPath();
+      DataOutputStream out = null;
       boolean success = false;
-      try (DomainSocket sock = DomainSocket.connect(path);
-           DataOutputStream out = new DataOutputStream(
-               new BufferedOutputStream(sock.getOutputStream()))) {
-        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
-        DataInputStream in = new DataInputStream(sock.getInputStream());
-        ReleaseShortCircuitAccessResponseProto resp =
-            ReleaseShortCircuitAccessResponseProto.parseFrom(
-                PBHelperClient.vintPrefixed(in));
-        if (resp.getStatus() != Status.SUCCESS) {
-          String error = resp.hasError() ? resp.getError() : "(unknown)";
-          throw new IOException(resp.getStatus().toString() + ": " + error);
+      int retries = 2;
+      try {
+        while (retries > 0) {
+          try {
+            if (domainSocket == null || !domainSocket.isOpen()) {
+              // we are running in single thread mode, no protection needed for

Review comment:
       Got it. The release scheduler has only one thread.
   /**
      * The executor service that runs the cacheCleaner.
      */
     private final ScheduledThreadPoolExecutor releaserExecutor
         = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
         setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
         build());

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testDNRestart() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =

Review comment:
       Ok. That's fine.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -910,4 +912,94 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test(timeout = 60000)
+  public void testDomainSocketClosedByDN() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final ShortCircuitCache cache =
+          fs.getClient().getClientContext().getShortCircuitCache();
+      DomainPeer peer = getDomainPeerToDn(conf);
+      MutableBoolean usedPeer = new MutableBoolean(false);
+      ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+      final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+      // Allocating the first shm slot requires using up a peer.
+      Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot1.getSlotId(), false);
+
+      Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot2.getSlotId(), false);
+
+      cache.scheduleSlotReleaser(slot1);
+
+      Thread.sleep(2000);
+      cache.scheduleSlotReleaser(slot2);
+      Thread.sleep(2000);
+      Assert.assertEquals(0,
+          cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testDNRestart() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final ShortCircuitCache cache =
+          fs.getClient().getClientContext().getShortCircuitCache();
+      DomainPeer peer = getDomainPeerToDn(conf);
+      MutableBoolean usedPeer = new MutableBoolean(false);
+      ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+      final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+      // Allocating the first shm slot requires using up a peer.
+      Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+
+      cluster.getDataNodes().get(0).getShortCircuitRegistry()
+          .registerSlot(blockId, slot1.getSlotId(), false);
+
+      // restart the datanode to invalidate the cache
+      cluster.restartDataNode(0);
+      Thread.sleep(1000);
+      // after the restart, new allocation and release should not be affect
+      cache.scheduleSlotReleaser(slot1);
+
+      Slot slot2 = null;
+      try {
+        slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+            "testReleaseSlotReuseDomainSocket_client");
+      } catch (ClosedChannelException ce) {
+
+      }
+      cache.scheduleSlotReleaser(slot2);
+      Thread.sleep(2000);
+      Assert.assertEquals(0, cluster.getDataNodes().get(0)

Review comment:
       This one doesn't look right. I think you want to remove "== 0"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to