jojochuang commented on a change in pull request #1885:
URL: https://github.com/apache/hadoop/pull/1885#discussion_r425904832



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testDNRestart() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =

Review comment:
       is the mini cluster required? starting a mini cluster takes time and 
prone to flaky failures. Would be nice to avoid using it.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()

Review comment:
       suggeset to use assertEquals()

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test

Review comment:
       please add test timeout

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);

Review comment:
       Can we make this sleep time shorter? Waiting for 15 seconds seems too 
excessive. You may have to change the timeout configuration.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);

Review comment:
       ditto

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testDNRestart() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    // restart the datanode to invalidate the cache
+    cluster.restartDataNode(0);
+    Thread.sleep(1000);
+    // after the restart, new allocation and release should not be affect
+    cache.scheduleSlotReleaser(slot1);
+
+    Slot slot2 = null;
+    try {
+      slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+    } catch (ClosedChannelException ce) {
+
+    }
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(2000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()

Review comment:
       assertEquals()

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
##########
@@ -909,4 +910,90 @@ public void testRequestFileDescriptorsWhenULimit() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testDomainSocketClosedByDN() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf =
+        createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot2.getSlotId(), false);
+
+    cache.scheduleSlotReleaser(slot1);
+
+    // make the DataXceiver timedout
+    Thread.sleep(5000);
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(10000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testDNRestart() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
+        .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+        "testReleaseSlotReuseDomainSocket_client");
+
+    cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .registerSlot(blockId, slot1.getSlotId(), false);
+
+    // restart the datanode to invalidate the cache
+    cluster.restartDataNode(0);
+    Thread.sleep(1000);
+    // after the restart, new allocation and release should not be affect
+    cache.scheduleSlotReleaser(slot1);
+
+    Slot slot2 = null;
+    try {
+      slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
+          "testReleaseSlotReuseDomainSocket_client");
+    } catch (ClosedChannelException ce) {
+
+    }
+    cache.scheduleSlotReleaser(slot2);
+    Thread.sleep(2000);
+    Assert.assertTrue(cluster.getDataNodes().get(0).getShortCircuitRegistry()
+        .getShmNum() == 0);
+    Assert.assertTrue(cache.getDfsClientShmManager().getShmNum() == 0);

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to