[ 
https://issues.apache.org/jira/browse/HDFS-15963?focusedWorklogId=580744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-580744
 ]

ASF GitHub Bot logged work on HDFS-15963:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/21 00:25
            Start Date: 12/Apr/21 00:25
    Worklog Time Spent: 10m 
      Work Description: jojochuang commented on a change in pull request #2889:
URL: https://github.com/apache/hadoop/pull/2889#discussion_r611265911



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
##########
@@ -153,16 +154,24 @@ synchronized boolean queryVolume(FsVolumeImpl volume) {
    * Execute the task sometime in the future, using ThreadPools.
    */
   synchronized void execute(String storageId, Runnable task) {
-    if (executors == null) {
-      throw new RuntimeException(
-          "AsyncLazyPersistService is already shutdown");
-    }
-    ThreadPoolExecutor executor = executors.get(storageId);
-    if (executor == null) {
-      throw new RuntimeException("Cannot find root storage volume with id " +
-          storageId + " for execution of task " + task);
-    } else {
-      executor.execute(task);
+    try {

Review comment:
       If the task is executed, the reference is closed because 
ReplicaLazyPersistTask#run() encloses it in a try ... with block.
   It is only when the task is not executed that we have to close it explicitly.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
##########
@@ -432,6 +432,7 @@
       ris = new ReplicaInputStreams(
           blockIn, checksumIn, volumeRef, fileIoProvider);
     } catch (IOException ioe) {
+      IOUtils.cleanupWithLogger(null, volumeRef);

Review comment:
       this is a good catch.
   
   But even without exceptions, shouldn't we close the reference when the 
BlockSender is closed too?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
##########
@@ -1805,4 +1806,38 @@ public void testNotifyNamenodeMissingOrNewBlock() throws 
Exception {
       cluster.shutdown();
     }
   }
+
+  @Test

Review comment:
       We should add a timeout (a class-wide timeout is preferred) here.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
##########
@@ -167,18 +167,26 @@ synchronized long countPendingDeletions() {
    * Execute the task sometime in the future, using ThreadPools.
    */
   synchronized void execute(FsVolumeImpl volume, Runnable task) {
-    if (executors == null) {
-      throw new RuntimeException("AsyncDiskService is already shutdown");
-    }
-    if (volume == null) {
-      throw new RuntimeException("A null volume does not have a executor");
-    }
-    ThreadPoolExecutor executor = executors.get(volume.getStorageID());
-    if (executor == null) {
-      throw new RuntimeException("Cannot find volume " + volume
-          + " for execution of task " + task);
-    } else {
-      executor.execute(task);
+    try {

Review comment:
       If the task is executed, the reference is closed at the end of 
ReplicaFileDeleteTask#run().
   This part of the code handles the case when the task is not executed that we 
have to close it explicitly.
   
   It might be a good idea to turn the cleanup code ReplicaFileDeleteTask#run()
   `      IOUtils.cleanupWithLogger(null, volumeRef);`
   to the same style as ReplicaLazyPersistTask#run()
   `try (FsVolumeReference ref = volumeRef) {`
   in case that ReplicaFileDeleteTask#run() throws an exception in the middle 
and doesn't close the reference.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
##########
@@ -562,4 +565,57 @@ void writeBlock(ExtendedBlock block, 
BlockConstructionStage stage,
         checksum, CachingStrategy.newDefaultStrategy(), false, false,
         null, null, new String[0]);
   }
+
+  @Test
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {

Review comment:
       It would be great if we could rewritten this test as a true unit test: A 
BlockSender with mocks. However, given the dependency on the DataNode class, I 
recognize this is not trivial.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
##########
@@ -562,4 +565,57 @@ void writeBlock(ExtendedBlock block, 
BlockConstructionStage stage,
         checksum, CachingStrategy.newDefaultStrategy(), false, false,
         null, null, new String[0]);
   }
+
+  @Test
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+    Path file = new Path("dataprotocol.dat");
+    int numDataNodes = 1;
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        numDataNodes).build();
+    try {
+      cluster.waitActive();
+      datanode = cluster.getFileSystem().getDataNodeStats(
+          DatanodeReportType.LIVE)[0];
+      dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
+      FileSystem fileSys = cluster.getFileSystem();
+
+      int fileLen = Math.min(
+          conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
+
+      DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
+          fileSys.getDefaultBlockSize(file),
+          fileSys.getDefaultReplication(file), 0L);
+
+      // get the first blockid for the file
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, 
file);
+
+      String bpid = cluster.getNamesystem().getBlockPoolId();
+      ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
+      sendBuf.reset();
+      recvBuf.reset();
+
+      // delete the meta file to create a exception in BlockSender constructor
+      DataNode dn = cluster.getDataNodes().get(0);
+      cluster.getMaterializedReplica(0, blk).deleteMeta();
+
+      FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
+          dn).getVolume(blk);
+      int beforeCnt = volume.getReferenceCount();
+
+      sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
+      sendRecvData("Copy a block.", false);
+      Thread.sleep(1000);
+
+      int afterCnt = volume.getReferenceCount();
+      assertEquals(beforeCnt, afterCnt);

Review comment:
       may I ask you to try and see if the reference count is the same for the 
normal case? (no missing metadata file) I suspect we don't handle that case 
well either, but would like you to confirm :)

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
##########
@@ -562,4 +565,57 @@ void writeBlock(ExtendedBlock block, 
BlockConstructionStage stage,
         checksum, CachingStrategy.newDefaultStrategy(), false, false,
         null, null, new String[0]);
   }
+
+  @Test
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+    Path file = new Path("dataprotocol.dat");
+    int numDataNodes = 1;
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        numDataNodes).build();
+    try {
+      cluster.waitActive();
+      datanode = cluster.getFileSystem().getDataNodeStats(
+          DatanodeReportType.LIVE)[0];
+      dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
+      FileSystem fileSys = cluster.getFileSystem();
+
+      int fileLen = Math.min(
+          conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
+
+      DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
+          fileSys.getDefaultBlockSize(file),
+          fileSys.getDefaultReplication(file), 0L);
+
+      // get the first blockid for the file
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, 
file);
+
+      String bpid = cluster.getNamesystem().getBlockPoolId();
+      ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
+      sendBuf.reset();
+      recvBuf.reset();
+
+      // delete the meta file to create a exception in BlockSender constructor
+      DataNode dn = cluster.getDataNodes().get(0);
+      cluster.getMaterializedReplica(0, blk).deleteMeta();
+
+      FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
+          dn).getVolume(blk);
+      int beforeCnt = volume.getReferenceCount();
+
+      sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
+      sendRecvData("Copy a block.", false);
+      Thread.sleep(1000);
+
+      int afterCnt = volume.getReferenceCount();
+      assertEquals(beforeCnt, afterCnt);
+
+    } catch (InterruptedException e) {

Review comment:
       If it interrupts, let it throw. No need to handle the exception.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
##########
@@ -280,4 +283,40 @@ public void run() {
       }
     }
   }
+  @Test
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+    getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int SEED = 0xFADED;
+    Path path = new Path("/" + METHOD_NAME + ".Writer.File.dat");
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetSpi.FsVolumeReferences volumes =
+        DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
+    int[] beforeCnts = new int[volumes.size()];
+    try {
+      FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+
+      // Create a runtime exception
+      ds.asyncLazyPersistService.shutdown();
+      for (int i = 0; i < volumes.size(); ++i) {
+        beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+      }
+
+      makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
+      Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+      for (int i = 0; i < volumes.size(); ++i) {
+        int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+        // LazyWriter keeps trying to save copies even if
+        // asyncLazyPersistService is already shutdown.
+        // If we do not release references, the number of
+        // references will increase infinitely.
+        Assert.assertTrue(
+            beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
+      }
+    } catch (InterruptedException e) {

Review comment:
       no need to handle the interruption

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
##########
@@ -1805,4 +1806,38 @@ public void testNotifyNamenodeMissingOrNewBlock() throws 
Exception {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(
+        new HdfsConfiguration()).build();
+    cluster.waitActive();
+    FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
+    ExtendedBlock eb;
+    ReplicaInfo info;
+    int beforeCnt = 0;
+    try {
+      List<Block> blockList = new ArrayList<Block>();
+      eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
+      info = new FinalizedReplica(
+          eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+      dataset.volumeMap.add(BLOCKPOOL, info);
+      ((LocalReplica) info).getBlockFile().createNewFile();
+      ((LocalReplica) info).getMetaFile().createNewFile();
+      blockList.add(info);
+
+      // Create a runtime exception
+      dataset.asyncDiskService.shutdown();
+
+      beforeCnt = vol.getReferenceCount();
+      dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+
+    } catch (RuntimeException re) {
+      int afterCnt = vol.getReferenceCount();
+      assertEquals(beforeCnt, afterCnt);
+      re.printStackTrace();

Review comment:
       printing the stack trace is redundant here. let's remove it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 580744)
    Time Spent: 1h 10m  (was: 1h)

> Unreleased volume references cause an infinite loop
> ---------------------------------------------------
>
>                 Key: HDFS-15963
>                 URL: https://issues.apache.org/jira/browse/HDFS-15963
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: datanode
>            Reporter: Shuyan Zhang
>            Assignee: Shuyan Zhang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HDFS-15963.001.patch, HDFS-15963.002.patch, 
> HDFS-15963.003.patch
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When BlockSender throws an exception because the meta-data cannot be found, 
> the volume reference obtained by the thread is not released, which causes the 
> thread trying to remove the volume to wait and fall into an infinite loop.
> {code:java}
> boolean checkVolumesRemoved() {
>   Iterator<FsVolumeImpl> it = volumesBeingRemoved.iterator();
>   while (it.hasNext()) {
>     FsVolumeImpl volume = it.next();
>     if (!volume.checkClosed()) {
>       return false;
>     }
>     it.remove();
>   }
>   return true;
> }
> boolean checkClosed() {
>   // always be true.
>   if (this.reference.getReferenceCount() > 0) {
>     FsDatasetImpl.LOG.debug("The reference count for {} is {}, wait to be 0.",
>         this, reference.getReferenceCount());
>     return false;
>   }
>   return true;
> }
> {code}
> At the same time, because the thread has been holding checkDirsLock when 
> removing the volume, other threads trying to acquire the same lock will be 
> permanently blocked.
> Similar problems also occur in RamDiskAsyncLazyPersistService and 
> FsDatasetAsyncDiskService.
> This patch releases the three previously unreleased volume references.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to