Author: arp Date: Wed Nov 6 00:25:10 2013 New Revision: 1539203 URL: http://svn.apache.org/r1539203 Log: HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed by szetszwo)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt?rev=1539203&r1=1539202&r2=1539203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt Wed Nov 6 00:25:10 2013 @@ -71,5 +71,8 @@ IMPROVEMENTS: HDFS-5455. NN should update storageMap on first heartbeat. (Arpit Agarwal) HDFS-5457. Fix TestDatanodeRegistration, TestFsck and TestAddBlockRetry. - (Contributed bt szetszwo) + (Contributed by szetszwo) + + HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed + by szetszwo) Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1539203&r1=1539202&r2=1539203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Nov 6 00:25:10 2013 @@ -402,8 +402,7 @@ public class DFSOutputStream extends FSO } // setup pipeline to append to the last block XXX retries?? - nodes = lastBlock.getLocations(); - storageIDs = lastBlock.getStorageIDs(); + setPipeline(lastBlock); errorIndex = -1; // no errors yet. if (nodes.length < 1) { throw new IOException("Unable to retrieve blocks locations " + @@ -412,6 +411,14 @@ public class DFSOutputStream extends FSO } } + + private void setPipeline(LocatedBlock lb) { + setPipeline(lb.getLocations(), lb.getStorageIDs()); + } + private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + this.nodes = nodes; + this.storageIDs = storageIDs; + } private void setFavoredNodes(String[] favoredNodes) { this.favoredNodes = favoredNodes; @@ -435,7 +442,7 @@ public class DFSOutputStream extends FSO this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - nodes = null; + setPipeline(null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -504,7 +511,7 @@ public class DFSOutputStream extends FSO if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } - nodes = nextBlockOutputStream(); + setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { @@ -912,7 +919,7 @@ public class DFSOutputStream extends FSO src, block, nodes, storageIDs, failed.toArray(new DatanodeInfo[failed.size()]), 1, dfsClient.clientName); - nodes = lb.getLocations(); + setPipeline(lb); //find the new datanode final int d = findNewDatanode(original); @@ -1012,7 +1019,14 @@ public class DFSOutputStream extends FSO System.arraycopy(nodes, 0, newnodes, 0, errorIndex); System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, newnodes.length-errorIndex); - nodes = newnodes; + + final String[] newStorageIDs = new String[newnodes.length]; + System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); + System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, + newStorageIDs.length-errorIndex); + + setPipeline(newnodes, newStorageIDs); + hasError = false; lastException.set(null); errorIndex = -1; @@ -1051,7 +1065,7 @@ public class DFSOutputStream extends FSO * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - private DatanodeInfo[] nextBlockOutputStream() throws IOException { + private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; int count = dfsClient.getConf().nBlockWriteRetry; @@ -1093,7 +1107,7 @@ public class DFSOutputStream extends FSO if (!success) { throw new IOException("Unable to create new block."); } - return nodes; + return lb; } // connects to the first datanode in the pipeline Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1539203&r1=1539202&r2=1539203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Nov 6 00:25:10 2013 @@ -1620,7 +1620,6 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. final DatanodeStorageInfo storageInfo = node.updateStorage(storage); - LOG.info("XXX storageInfo=" + storageInfo + ", storage=" + storage); if (namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: "