Repository: hadoop Updated Branches: refs/heads/branch-2 29fe5af01 -> f3cdf29af
HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f3cdf29a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f3cdf29a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f3cdf29a Branch: refs/heads/branch-2 Commit: f3cdf29af4c67a1963f51f02bf88075bf6dce679 Parents: 29fe5af Author: Wei-Chiu Chuang <weic...@apache.org> Authored: Sat Feb 25 21:13:51 2017 -0800 Committer: Wei-Chiu Chuang <weic...@apache.org> Committed: Sat Feb 25 21:13:51 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 6 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 119 ++++++++++++------- .../apache/hadoop/hdfs/TestDFSOutputStream.java | 3 +- 3 files changed, 83 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1e549d9..404a644 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -606,9 +606,9 @@ public class DFSOutputStream extends FSOutputSummer // update the block length first time irrespective of flag if (updateLength || getStreamer().getPersistBlocks().get()) { synchronized (this) { - if (!getStreamer().streamerClosed() - && getStreamer().getBlock() != null) { - lastBlockLength = getStreamer().getBlock().getNumBytes(); + final ExtendedBlock block = getStreamer().getBlock(); + if (!getStreamer().streamerClosed() && block != null) { + lastBlockLength = block.getNumBytes(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index aa55a3e..bcf740f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -150,8 +150,6 @@ class DataStreamer extends Daemon { /** * Record a connection exception. - * @param e - * @throws InvalidEncryptionKeyException */ void recordFailure(final InvalidEncryptionKeyException e) throws InvalidEncryptionKeyException { @@ -186,9 +184,8 @@ class DataStreamer extends Daemon { final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken) throws IOException { //send the TRANSFER_BLOCK request - new Sender(out) - .transferBlock(block, blockToken, dfsClient.clientName, targets, - targetStorageTypes); + new Sender(out).transferBlock(block.getCurrentBlock(), blockToken, + dfsClient.clientName, targets, targetStorageTypes); out.flush(); //ack BlockOpResponseProto transferResponse = BlockOpResponseProto @@ -207,6 +204,42 @@ class DataStreamer extends Daemon { } } + static class BlockToWrite { + private ExtendedBlock currentBlock; + + BlockToWrite(ExtendedBlock block) { + setCurrentBlock(block); + } + + synchronized ExtendedBlock getCurrentBlock() { + return currentBlock == null ? null : new ExtendedBlock(currentBlock); + } + + synchronized long getNumBytes() { + return currentBlock == null ? 0 : currentBlock.getNumBytes(); + } + + synchronized void setCurrentBlock(ExtendedBlock block) { + currentBlock = (block == null || block.getLocalBlock() == null) ? + null : new ExtendedBlock(block); + } + + synchronized void setNumBytes(long numBytes) { + assert currentBlock != null; + currentBlock.setNumBytes(numBytes); + } + + synchronized void setGenerationStamp(long generationStamp) { + assert currentBlock != null; + currentBlock.setGenerationStamp(generationStamp); + } + + @Override + public synchronized String toString() { + return currentBlock == null ? "null" : currentBlock.toString(); + } + } + /** * Create a socket for a write pipeline * @@ -420,7 +453,7 @@ class DataStreamer extends Daemon { } private volatile boolean streamerClosed = false; - private volatile ExtendedBlock block; // its length is number of bytes acked + private final BlockToWrite block; // its length is number of bytes acked private Token<BlockTokenIdentifier> accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; @@ -481,12 +514,14 @@ class DataStreamer extends Daemon { private final String[] favoredNodes; private final EnumSet<AddBlockFlag> addBlockFlags; - private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, + DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes, EnumSet<AddBlockFlag> flags) { + this.block = new BlockToWrite(block); this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -512,9 +547,8 @@ class DataStreamer extends Daemon { AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes, EnumSet<AddBlockFlag> flags) { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes, flags); - this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -527,10 +561,9 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null, null); + this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManage, true, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); } @@ -1295,7 +1328,7 @@ class DataStreamer extends Daemon { LocatedBlock lb; //get a new datanode lb = dfsClient.namenode.getAdditionalDatanode( - src, stat.getFileId(), block, nodes, storageIDs, + src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs, exclude.toArray(new DatanodeInfo[exclude.size()]), 1, dfsClient.clientName); // a new node was allocated by the namenode. Update nodes. @@ -1407,7 +1440,7 @@ class DataStreamer extends Daemon { } // while if (success) { - block = updatePipeline(newGS); + updatePipeline(newGS); } return false; // do not sleep, continue processing } @@ -1504,17 +1537,27 @@ class DataStreamer extends Daemon { } private LocatedBlock updateBlockForPipeline() throws IOException { - return dfsClient.namenode.updateBlockForPipeline(block, + return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName); } + void updateBlockGS(final long newGS) { + block.setGenerationStamp(newGS); + } + /** update pipeline at the namenode */ - ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - return newBlock; + private void updatePipeline(long newGS) throws IOException { + final ExtendedBlock oldBlock = block.getCurrentBlock(); + // the new GS has been propagated to all DN, it should be ok to update the + // local block state + updateBlockGS(newGS); + dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, + block.getCurrentBlock(), nodes, storageIDs); + } + + DatanodeInfo[] getExcludedNodes() { + return excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) + .keySet().toArray(new DatanodeInfo[0]); } /** @@ -1529,35 +1572,30 @@ class DataStreamer extends Daemon { StorageType[] storageTypes; int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; - ExtendedBlock oldBlock = block; + final ExtendedBlock oldBlock = block.getCurrentBlock(); do { errorState.reset(); lastException.clear(); success = false; - DatanodeInfo[] excluded = - excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) - .keySet() - .toArray(new DatanodeInfo[0]); - block = oldBlock; - lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); - block = lb.getBlock(); + DatanodeInfo[] excluded = getExcludedNodes(); + lb = locateFollowingBlock( + excluded.length > 0 ? excluded : null, oldBlock); + block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); storageTypes = lb.getStorageTypes(); - // // Connect to first DataNode in the list. - // success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { LOG.warn("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, - dfsClient.clientName); - block = null; + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), + stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); @@ -1618,7 +1656,7 @@ class DataStreamer extends Daemon { // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. - ExtendedBlock blockCopy = new ExtendedBlock(block); + ExtendedBlock blockCopy = block.getCurrentBlock(); blockCopy.setNumBytes(stat.getBlockSize()); boolean[] targetPinnings = getPinnings(nodes); @@ -1728,8 +1766,8 @@ class DataStreamer extends Daemon { } } - protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) - throws IOException { + protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded, + ExtendedBlock oldBlock) throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); @@ -1738,7 +1776,7 @@ class DataStreamer extends Daemon { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes, + oldBlock, excluded, stat.getFileId(), favoredNodes, addBlockFlags); } catch (RemoteException e) { IOException ue = @@ -1823,7 +1861,7 @@ class DataStreamer extends Daemon { * @return the block this streamer is writing to */ ExtendedBlock getBlock() { - return block; + return block.getCurrentBlock(); } /** @@ -2016,7 +2054,8 @@ class DataStreamer extends Daemon { @Override public String toString() { - return (block == null? null: block.getLocalBlock()) + final ExtendedBlock extendedBlock = block.getCurrentBlock(); + return (extendedBlock == null ? null : extendedBlock.getLocalBlock()) + "@" + Arrays.toString(getNodes()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 750103d..9ec01b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -110,8 +110,7 @@ public class TestDFSOutputStream { * packet size < 64kB. See HDFS-7308 for details. */ @Test - public void testComputePacketChunkSize() - throws Exception { + public void testComputePacketChunkSize() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); FSDataOutputStream os = fs.create(new Path("/test")); DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org