aajisaka commented on a change in pull request #3065:
URL: https://github.com/apache/hadoop/pull/3065#discussion_r647363773
##########
File path:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
##########
@@ -3111,106 +3042,127 @@ void processFirstBlockReport(
}
}
- private void reportDiffSorted(DatanodeStorageInfo storageInfo,
- Iterable<BlockReportReplica> newReport,
+ private void reportDiff(DatanodeStorageInfo storageInfo,
+ BlockListAsLongs newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- // The blocks must be sorted and the storagenodes blocks must be sorted
- Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+ // place a delimiter in the list which separates blocks
+ // that have been reported from those that have not
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
- BlockInfo storageBlock = null;
-
- for (BlockReportReplica replica : newReport) {
-
- long replicaID = replica.getBlockId();
- if (BlockIdManager.isStripedBlockID(replicaID)
- && (!hasNonEcBlockUsingStripedID ||
- !blocksMap.containsBlock(replica))) {
- replicaID = BlockIdManager.convertToStripedID(replicaID);
- }
-
- ReplicaState reportedState = replica.getState();
-
- LOG.debug("Reported block {} on {} size {} replicaState = {}",
- replica, dn, replica.getNumBytes(), reportedState);
-
- if (shouldPostponeBlocksFromFuture
- && isGenStampInFuture(replica)) {
- queueReportedBlock(storageInfo, replica, reportedState,
- QUEUE_REASON_FUTURE_GENSTAMP);
- continue;
- }
-
- if (storageBlock == null && storageBlocksIterator.hasNext()) {
- storageBlock = storageBlocksIterator.next();
- }
-
- do {
- int cmp;
- if (storageBlock == null ||
- (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
- // Check if block is available in NN but not yet on this storage
- BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
- if (nnBlock != null) {
- reportDiffSortedInner(storageInfo, replica, reportedState,
- nnBlock, toAdd, toCorrupt, toUC);
- } else {
- // Replica not found anywhere so it should be invalidated
- toInvalidate.add(new Block(replica));
- }
- break;
- } else if (cmp == 0) {
- // Replica matched current storageblock
- reportDiffSortedInner(storageInfo, replica, reportedState,
- storageBlock, toAdd, toCorrupt, toUC);
- storageBlock = null;
- } else {
- // replica has higher ID than storedBlock
- // Remove all stored blocks with IDs lower than replica
- do {
- toRemove.add(storageBlock);
- storageBlock = storageBlocksIterator.hasNext()
- ? storageBlocksIterator.next() : null;
- } while (storageBlock != null &&
- Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+ Block delimiterBlock = new Block();
+ BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
+ (short) 1);
+ AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
+ assert result == AddBlockResult.ADDED
+ : "Delimiting block cannot be present in the node";
+ int headIndex = 0; //currently the delimiter is in the head of the list
+ int curIndex;
+
+ if (newReport == null) {
+ newReport = BlockListAsLongs.EMPTY;
+ }
+ // scan the report and process newly reported blocks
+ for (BlockReportReplica iblk : newReport) {
+ ReplicaState iState = iblk.getState();
+ LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
+ iblk.getNumBytes(), iState);
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
+ iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
+
+ // move block to the head of the list
+ if (storedBlock != null) {
+ curIndex = storedBlock.findStorageInfo(storageInfo);
+ if (curIndex >= 0) {
+ headIndex =
+ storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
}
- } while (storageBlock != null);
+ }
}
- // Iterate any remaining blocks that have not been reported and remove them
- while (storageBlocksIterator.hasNext()) {
- toRemove.add(storageBlocksIterator.next());
+ // collect blocks that have not been reported
+ // all of them are next to the delimiter
+ Iterator<BlockInfo> it =
+ storageInfo.new BlockIterator(delimiter.getNext(0));
+ while (it.hasNext()) {
+ toRemove.add(it.next());
}
+ storageInfo.removeBlock(delimiter);
}
- private void reportDiffSortedInner(
+ /**
+ * Process a block replica reported by the data-node.
+ * No side effects except adding to the passed-in Collections.
+ *
+ * <ol>
+ * <li>If the block is not known to the system (not in blocksMap) then the
+ * data-node should be notified to invalidate this block.</li>
+ * <li>If the reported replica is valid that is has the same generation stamp
+ * and length as recorded on the name-node, then the replica location should
+ * be added to the name-node.</li>
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
+ * which triggers replication of the existing valid replicas.
+ * Corrupt replicas are removed from the system when the block
+ * is fully replicated.</li>
+ * <li>If the reported replica is for a block currently marked "under
+ * construction" in the NN, then it should be added to the
+ * BlockUnderConstructionFeature's list of replicas.</li>
+ * </ol>
+ *
+ * @param storageInfo DatanodeStorageInfo that sent the report.
+ * @param block reported block replica
+ * @param reportedState reported replica state
+ * @param toAdd add to DatanodeDescriptor
+ * @param toInvalidate missing blocks (not in the blocks map)
+ * should be removed from the data-node
+ * @param toCorrupt replicas with unexpected length or generation stamp;
+ * add to corrupt replicas
+ * @param toUC replicas of blocks currently under construction
+ * @return the up-to-date stored block, if it should be kept.
+ * Otherwise, null.
+ */
+ private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
- final BlockReportReplica replica, final ReplicaState reportedState,
- final BlockInfo storedBlock,
+ final Block block, final ReplicaState reportedState,
final Collection<BlockInfoToAdd> toAdd,
+ final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
- assert replica != null;
- assert storedBlock != null;
-
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
+ LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn,
Review comment:
Nit: two whitespaces between {} and replicaState.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]