[ 
https://issues.apache.org/jira/browse/HDFS-17564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haiyang Hu updated HDFS-17564:
------------------------------
    Description: 
If DataNode is marked as busy and contains many EC blocks, when running 
decommission DataNode, when execute ErasureCodingWork#addTaskToDatanode, here 
will no replication work will be generated for ecBlocksToBeReplicated, but 
related metrics (such as DatanodeDescriptor#currApproxBlocksScheduled, 
pendingReconstruction and needReconstruction) will still updated.

*Specific code:*
BlockManager#scheduleReconstruction -> BlockManager#chooseSourceDatanodes 
[2628~2650] 
If DataNode is marked as busy and contains many EC blocks here will not add to 
srcNodes.
.
{code:java}
@VisibleForTesting
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
    List<DatanodeDescriptor> containingNodes,
    List<DatanodeStorageInfo> nodesContainingLiveReplicas,
    NumberReplicas numReplicas, List<Byte> liveBlockIndices,
    List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int 
priority) {
  containingNodes.clear();
  nodesContainingLiveReplicas.clear();
  List<DatanodeDescriptor> srcNodes = new ArrayList<>();
 ...

  for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
    final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
    final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
        storage, corruptReplicas.getNodes(block), false);
    ...
    // for EC here need to make sure the numReplicas replicates state correct
    // because in the scheduleReconstruction it need the numReplicas to check
    // whether need to reconstruct the ec internal block
    byte blockIndex = -1;
    if (isStriped) {
      blockIndex = ((BlockInfoStriped) block)
          .getStorageBlockIndex(storage);
      countLiveAndDecommissioningReplicas(numReplicas, state,
          liveBitSet, decommissioningBitSet, blockIndex);
    }

    if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
&& (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
        && node.getNumberOfBlocksToBeReplicated() +
        node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) {
      if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
        liveBusyBlockIndices.add(blockIndex);
        //HDFS-16566 ExcludeReconstructed won't be reconstructed.
        excludeReconstructed.add(blockIndex);
      }
      continue; // already reached replication limit
    }

    if (node.getNumberOfBlocksToBeReplicated() +
        node.getNumberOfBlocksToBeErasureCoded() >= 
replicationStreamsHardLimit) {
      if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
        liveBusyBlockIndices.add(blockIndex);
        //HDFS-16566 ExcludeReconstructed won't be reconstructed.
        excludeReconstructed.add(blockIndex);
      }
      continue;
    }

    if(isStriped || srcNodes.isEmpty()) {
      srcNodes.add(node);
      if (isStriped) {
        liveBlockIndices.add(blockIndex);
      }
      continue;
    }
   ...
{code}

ErasureCodingWork#addTaskToDatanode[149~157]

{code:java}
@Override
void addTaskToDatanode(NumberReplicas numberReplicas) {
  final DatanodeStorageInfo[] targets = getTargets();
  assert targets.length > 0;
  BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();

  ...
  } else if ((numberReplicas.decommissioning() > 0 ||
      numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
      hasAllInternalBlocks()) {
    List<Integer> leavingServiceSources = findLeavingServiceSources();
    // decommissioningSources.size() should be >= targets.length
    // if the leavingServiceSources size is 0,  here will not to 
createReplicationWork
    final int num = Math.min(leavingServiceSources.size(), targets.length);
    for (int i = 0; i < num; i++) {
      createReplicationWork(leavingServiceSources.get(i), targets[i]);
    }
  ...
}

// Since there is no decommission busy datanode in srcNodes, here return the 
set size of srcIndices as 0.
private List<Integer> findLeavingServiceSources() {
    // Mark the block in normal node.
    BlockInfoStriped block = (BlockInfoStriped)getBlock();
    BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
    for (int i = 0; i < getSrcNodes().length; i++) {
      if (getSrcNodes()[i].isInService()) {
        bitSet.set(liveBlockIndices[i]);
      }
    }
    // If the block is on the node which is decommissioning or
    // entering_maintenance, and it doesn't exist on other normal nodes,
    // we just add the node into source list.
    List<Integer> srcIndices = new ArrayList<>();
    for (int i = 0; i < getSrcNodes().length; i++) {
      if ((getSrcNodes()[i].isDecommissionInProgress() ||
          (getSrcNodes()[i].isEnteringMaintenance() &&
          getSrcNodes()[i].isAlive())) &&
          !bitSet.get(liveBlockIndices[i])) {
        srcIndices.add(i);
      }
    }
    return srcIndices;
  }
{code}

so we need to fix this logic to avoid inaccurate metrics.


  was:
If DataNode is marked as busy and contains many EC blocks, when running 
decommission DataNode, when execute ErasureCodingWork#addTaskToDatanode, here 
will no replication work will be generated for ecBlocksToBeReplicated, but 
related metrics (such as DatanodeDescriptor#currApproxBlocksScheduled, 
pendingReconstruction and needReconstruction) will still updated
.



> Erasure Coding: Fix the issue of inaccurate metrics when decommission mark 
> busy DN
> ----------------------------------------------------------------------------------
>
>                 Key: HDFS-17564
>                 URL: https://issues.apache.org/jira/browse/HDFS-17564
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: Haiyang Hu
>            Assignee: Haiyang Hu
>            Priority: Major
>
> If DataNode is marked as busy and contains many EC blocks, when running 
> decommission DataNode, when execute ErasureCodingWork#addTaskToDatanode, here 
> will no replication work will be generated for ecBlocksToBeReplicated, but 
> related metrics (such as DatanodeDescriptor#currApproxBlocksScheduled, 
> pendingReconstruction and needReconstruction) will still updated.
> *Specific code:*
> BlockManager#scheduleReconstruction -> BlockManager#chooseSourceDatanodes 
> [2628~2650] 
> If DataNode is marked as busy and contains many EC blocks here will not add 
> to srcNodes.
> .
> {code:java}
> @VisibleForTesting
> DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
>     List<DatanodeDescriptor> containingNodes,
>     List<DatanodeStorageInfo> nodesContainingLiveReplicas,
>     NumberReplicas numReplicas, List<Byte> liveBlockIndices,
>     List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int 
> priority) {
>   containingNodes.clear();
>   nodesContainingLiveReplicas.clear();
>   List<DatanodeDescriptor> srcNodes = new ArrayList<>();
>  ...
>   for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
>     final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
>     final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
>         storage, corruptReplicas.getNodes(block), false);
>     ...
>     // for EC here need to make sure the numReplicas replicates state correct
>     // because in the scheduleReconstruction it need the numReplicas to check
>     // whether need to reconstruct the ec internal block
>     byte blockIndex = -1;
>     if (isStriped) {
>       blockIndex = ((BlockInfoStriped) block)
>           .getStorageBlockIndex(storage);
>       countLiveAndDecommissioningReplicas(numReplicas, state,
>           liveBitSet, decommissioningBitSet, blockIndex);
>     }
>     if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
> && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
>         && node.getNumberOfBlocksToBeReplicated() +
>         node.getNumberOfBlocksToBeErasureCoded() >= maxReplicationStreams) {
>       if (isStriped && (state == StoredReplicaState.LIVE
> || state == StoredReplicaState.DECOMMISSIONING)) {
>         liveBusyBlockIndices.add(blockIndex);
>         //HDFS-16566 ExcludeReconstructed won't be reconstructed.
>         excludeReconstructed.add(blockIndex);
>       }
>       continue; // already reached replication limit
>     }
>     if (node.getNumberOfBlocksToBeReplicated() +
>         node.getNumberOfBlocksToBeErasureCoded() >= 
> replicationStreamsHardLimit) {
>       if (isStriped && (state == StoredReplicaState.LIVE
> || state == StoredReplicaState.DECOMMISSIONING)) {
>         liveBusyBlockIndices.add(blockIndex);
>         //HDFS-16566 ExcludeReconstructed won't be reconstructed.
>         excludeReconstructed.add(blockIndex);
>       }
>       continue;
>     }
>     if(isStriped || srcNodes.isEmpty()) {
>       srcNodes.add(node);
>       if (isStriped) {
>         liveBlockIndices.add(blockIndex);
>       }
>       continue;
>     }
>    ...
> {code}
> ErasureCodingWork#addTaskToDatanode[149~157]
> {code:java}
> @Override
> void addTaskToDatanode(NumberReplicas numberReplicas) {
>   final DatanodeStorageInfo[] targets = getTargets();
>   assert targets.length > 0;
>   BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
>   ...
>   } else if ((numberReplicas.decommissioning() > 0 ||
>       numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
>       hasAllInternalBlocks()) {
>     List<Integer> leavingServiceSources = findLeavingServiceSources();
>     // decommissioningSources.size() should be >= targets.length
>     // if the leavingServiceSources size is 0,  here will not to 
> createReplicationWork
>     final int num = Math.min(leavingServiceSources.size(), targets.length);
>     for (int i = 0; i < num; i++) {
>       createReplicationWork(leavingServiceSources.get(i), targets[i]);
>     }
>   ...
> }
> // Since there is no decommission busy datanode in srcNodes, here return the 
> set size of srcIndices as 0.
> private List<Integer> findLeavingServiceSources() {
>     // Mark the block in normal node.
>     BlockInfoStriped block = (BlockInfoStriped)getBlock();
>     BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
>     for (int i = 0; i < getSrcNodes().length; i++) {
>       if (getSrcNodes()[i].isInService()) {
>         bitSet.set(liveBlockIndices[i]);
>       }
>     }
>     // If the block is on the node which is decommissioning or
>     // entering_maintenance, and it doesn't exist on other normal nodes,
>     // we just add the node into source list.
>     List<Integer> srcIndices = new ArrayList<>();
>     for (int i = 0; i < getSrcNodes().length; i++) {
>       if ((getSrcNodes()[i].isDecommissionInProgress() ||
>           (getSrcNodes()[i].isEnteringMaintenance() &&
>           getSrcNodes()[i].isAlive())) &&
>           !bitSet.get(liveBlockIndices[i])) {
>         srcIndices.add(i);
>       }
>     }
>     return srcIndices;
>   }
> {code}
> so we need to fix this logic to avoid inaccurate metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to