HDFS-7936. Erasure coding: resolving conflicts in the branch when merging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c9a57b0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c9a57b0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c9a57b0 Branch: refs/heads/HDFS-7285 Commit: 6c9a57b09218472ed6894403df20e1d4dab1df6e Parents: 3d96ae6 Author: Zhe Zhang <z...@apache.org> Authored: Mon Apr 13 10:56:24 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Mon May 4 10:13:23 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSInputStream.java | 4 ++-- .../apache/hadoop/hdfs/DFSStripedInputStream.java | 16 +++++++++------- .../apache/hadoop/hdfs/DFSStripedOutputStream.java | 3 ++- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 5 +++-- .../hadoop/hdfs/TestDFSStripedOutputStream.java | 3 ++- 5 files changed, 18 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c9a57b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 79bbd54..9104f84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1106,7 +1106,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { final int length = (int) (end - start + 1); - actualGetFromOneDataNode(datanode, block, start, end, buf, + actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf, new int[]{offset}, new int[]{length}, corruptedBlockMap); } @@ -1125,7 +1125,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * block replica */ void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long startInBlk, final long endInBlk, + long blockStartOffset, final long startInBlk, final long endInBlk, byte[] buf, int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c9a57b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 077b0f8..8a431b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -224,7 +224,7 @@ public class DFSStripedInputStream extends DFSInputStream { * Real implementation of pread. */ @Override - protected void fetchBlockByteRange(LocatedBlock block, long start, + protected void fetchBlockByteRange(long blockStartOffset, long start, long end, byte[] buf, int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { @@ -234,7 +234,7 @@ public class DFSStripedInputStream extends DFSInputStream { int len = (int) (end - start + 1); // Refresh the striped block group - block = getBlockGroupAt(block.getStartOffset()); + LocatedBlock block = getBlockGroupAt(blockStartOffset); assert block instanceof LocatedStripedBlock : "NameNode" + " should return a LocatedStripedBlock for a striped file"; LocatedStripedBlock blockGroup = (LocatedStripedBlock) block; @@ -254,9 +254,11 @@ public class DFSStripedInputStream extends DFSInputStream { DatanodeInfo loc = blks[i].getLocations()[0]; StorageType type = blks[i].getStorageTypes()[0]; DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( - loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type); - Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i], - rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf, + loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), + type); + Callable<Void> readCallable = getFromOneDataNode(dnAddr, + blks[i].getStartOffset(), rp.startOffsetInBlock, + rp.startOffsetInBlock + rp.readLength - 1, buf, rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i); Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable); DFSClient.LOG.debug("Submitting striped read request for " + blks[i]); @@ -272,7 +274,7 @@ public class DFSStripedInputStream extends DFSInputStream { } private Callable<Void> getFromOneDataNode(final DNAddrPair datanode, - final LocatedBlock block, final long start, final long end, + final long blockStartOffset, final long start, final long end, final byte[] buf, final int[] offsets, final int[] lengths, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final int hedgedReadId) { @@ -283,7 +285,7 @@ public class DFSStripedInputStream extends DFSInputStream { TraceScope scope = Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan); try { - actualGetFromOneDataNode(datanode, block, start, + actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf, offsets, lengths, corruptedBlockMap); } finally { scope.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c9a57b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index aded4fe..1d0e1be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -284,7 +284,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { } for (StripedDataStreamer streamer : streamers) { streamer.setLastException(new IOException("Lease timeout of " - + (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); + + (dfsClient.getConf().getHdfsTimeout()/1000) + + " seconds expired.")); } closeThreads(true); dfsClient.endFileLease(fileId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c9a57b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7be243b..5159756 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3103,7 +3103,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, final long blockSize; final short numTargets; final byte storagePolicyID; - final boolean isStriped; Node clientNode = null; String clientMachine = null; @@ -3145,7 +3144,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, clientNode = blockManager.getDatanodeManager().getDatanodeByHost( clientMachine); // TODO: make block group size configurable (HDFS-7337) - isStriped = pendingFile.isStriped(); + boolean isStriped = pendingFile.isStriped(); numTargets = isStriped ? HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : pendingFile.getFileReplication(); @@ -3174,6 +3173,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException { Block newBlock = null; long offset; + boolean isStriped; checkOperation(OperationCategory.WRITE); waitForLoadingFSImage(); writeLock(); @@ -3204,6 +3204,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, commitOrCompleteLastBlock(pendingFile, fileState.iip, ExtendedBlock.getLocalBlock(previous)); + isStriped = pendingFile.isStriped(); // allocate new block, record block locations in INode. newBlock = createNewBlock(isStriped); saveAllocatedBlock(src, fileState.iip, newBlock, targets, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c9a57b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index f5a37f3..ee6998b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -241,7 +242,7 @@ public class TestDFSStripedOutputStream { } block.setNumBytes(lenOfBlock); - BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setFileName(src). setBlock(block). setBlockToken(lblock.getBlockToken()).