[ 
https://issues.apache.org/jira/browse/HDFS-17564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862523#comment-17862523
 ] 

ASF GitHub Bot commented on HDFS-17564:
---------------------------------------

Hexiaoqiao commented on code in PR #6911:
URL: https://github.com/apache/hadoop/pull/6911#discussion_r1661073434


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java:
##########
@@ -462,6 +462,59 @@ public void testFileChecksumAfterDecommission() throws 
Exception {
         fileChecksum1.equals(fileChecksum2));
   }
 
+  /**
+   * Test decommission when DN marked as busy.
+   * @throwsException
+   */
+  @Test(timeout = 120000)
+  public void testBusyAfterDecommissionNode() throws Exception {
+    byte busyDNIndex = 0;
+    //1. create EC file
+    final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode");
+    int writeBytes = cellSize * dataBlocks;
+    writeStripedFile(dfs, ecFile, writeBytes);
+    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+    //2. make once DN busy
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(ecFile.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
+    DatanodeDescriptor busyNode =
+        dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
+    for (int j = 0; j < replicationStreamsHardLimit; j++) {
+      busyNode.incrementPendingReplicationWithoutTargets();
+    }
+
+    //3. decomission one node
+    List<DatanodeInfo> decommisionNodes = new ArrayList<>();
+    decommisionNodes.add(busyNode);
+    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS);
+
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    bm.getDatanodeManager().fetchDatanodes(live, null, false);
+    int liveDecommissioning = 0;
+    for (DatanodeDescriptor node : live) {
+      liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0;
+    }
+    assertEquals(decommisionNodes.size(), liveDecommissioning);
+
+    //4. wait for decommission block to replicate
+    Thread.sleep(3000);

Review Comment:
   What about to use `GenericTestUtils.waitFor` rather than `Thread.sleep`?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java:
##########
@@ -462,6 +462,59 @@ public void testFileChecksumAfterDecommission() throws 
Exception {
         fileChecksum1.equals(fileChecksum2));
   }
 
+  /**
+   * Test decommission when DN marked as busy.
+   * @throwsException
+   */
+  @Test(timeout = 120000)
+  public void testBusyAfterDecommissionNode() throws Exception {
+    byte busyDNIndex = 0;

Review Comment:
   Any consideration when define byte type for index here? Not blocker just out 
of interest.





> Erasure Coding: Fix the issue of inaccurate metrics when decommission mark 
> busy DN
> ----------------------------------------------------------------------------------
>
>                 Key: HDFS-17564
>                 URL: https://issues.apache.org/jira/browse/HDFS-17564
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: Haiyang Hu
>            Assignee: Haiyang Hu
>            Priority: Major
>
> If DataNode is marked as busy and contains many EC blocks, when running 
> decommission DataNode, when execute ErasureCodingWork#addTaskToDatanode, here 
> will no replication work will be generated for ecBlocksToBeReplicated, but 
> related metrics (such as DatanodeDescriptor#currApproxBlocksScheduled, 
> pendingReconstruction and needReconstruction) will still updated.
> *Specific code:*
> BlockManager#scheduleReconstruction -> BlockManager#chooseSourceDatanodes 
> [2628~2650] 
> If DataNode is marked as busy and contains many EC blocks here will not add 
> to srcNodes.
> .
> {code:java}
> @VisibleForTesting
> DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
>     List<DatanodeDescriptor> containingNodes,
>     List<DatanodeStorageInfo> nodesContainingLiveReplicas,
>     NumberReplicas numReplicas, List<Byte> liveBlockIndices,
>     List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int 
> priority) {
>   containingNodes.clear();
>   nodesContainingLiveReplicas.clear();
>   List<DatanodeDescriptor> srcNodes = new ArrayList<>();
>  ...
>   for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
>     final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
>     final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
>         storage, corruptReplicas.getNodes(block), false);
>     ...
>     // for EC here need to make sure the numReplicas replicates state correct
>     // because in the scheduleReconstruction it need the numReplicas to check
>     // whether need to reconstruct the ec internal block
>     byte blockIndex = -1;
>     if (isStriped) {
>       blockIndex = ((BlockInfoStriped) block)
>           .getStorageBlockIndex(storage);
>       countLiveAndDecommissioningReplicas(numReplicas, state,
>           liveBitSet, decommissioningBitSet, blockIndex);
>     }
>     if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
> && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
>         && node.getNumberOfBlocksToBeReplicated() +
>         node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) {
>       if (isStriped && (state == StoredReplicaState.LIVE
> || state == StoredReplicaState.DECOMMISSIONING)) {
>         liveBusyBlockIndices.add(blockIndex);
>         //HDFS-16566 ExcludeReconstructed won't be reconstructed.
>         excludeReconstructed.add(blockIndex);
>       }
>       continue; // already reached replication limit
>     }
>     if (node.getNumberOfBlocksToBeReplicated() +
>         node.getNumberOfBlocksToBeErasureCoded() >= 
> replicationStreamsHardLimit) {
>       if (isStriped && (state == StoredReplicaState.LIVE
> || state == StoredReplicaState.DECOMMISSIONING)) {
>         liveBusyBlockIndices.add(blockIndex);
>         //HDFS-16566 ExcludeReconstructed won't be reconstructed.
>         excludeReconstructed.add(blockIndex);
>       }
>       continue;
>     }
>     if(isStriped || srcNodes.isEmpty()) {
>       srcNodes.add(node);
>       if (isStriped) {
>         liveBlockIndices.add(blockIndex);
>       }
>       continue;
>     }
>    ...
> {code}
> ErasureCodingWork#addTaskToDatanode[149~157]
> {code:java}
> @Override
> void addTaskToDatanode(NumberReplicas numberReplicas) {
>   final DatanodeStorageInfo[] targets = getTargets();
>   assert targets.length > 0;
>   BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
>   ...
>   } else if ((numberReplicas.decommissioning() > 0 ||
>       numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
>       hasAllInternalBlocks()) {
>     List<Integer> leavingServiceSources = findLeavingServiceSources();
>     // decommissioningSources.size() should be >= targets.length
>     // if the leavingServiceSources size is 0,  here will not to 
> createReplicationWork
>     final int num = Math.min(leavingServiceSources.size(), targets.length);
>     for (int i = 0; i < num; i++) {
>       createReplicationWork(leavingServiceSources.get(i), targets[i]);
>     }
>   ...
> }
> // Since there is no decommission busy datanode in srcNodes, here return the 
> set size of srcIndices as 0.
> private List<Integer> findLeavingServiceSources() {
>     // Mark the block in normal node.
>     BlockInfoStriped block = (BlockInfoStriped)getBlock();
>     BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
>     for (int i = 0; i < getSrcNodes().length; i++) {
>       if (getSrcNodes()[i].isInService()) {
>         bitSet.set(liveBlockIndices[i]);
>       }
>     }
>     // If the block is on the node which is decommissioning or
>     // entering_maintenance, and it doesn't exist on other normal nodes,
>     // we just add the node into source list.
>     List<Integer> srcIndices = new ArrayList<>();
>     for (int i = 0; i < getSrcNodes().length; i++) {
>       if ((getSrcNodes()[i].isDecommissionInProgress() ||
>           (getSrcNodes()[i].isEnteringMaintenance() &&
>           getSrcNodes()[i].isAlive())) &&
>           !bitSet.get(liveBlockIndices[i])) {
>         srcIndices.add(i);
>       }
>     }
>     return srcIndices;
>   }
> {code}
> so we need to fix this logic to avoid inaccurate metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to