HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6419900a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6419900a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6419900a Branch: refs/heads/HDFS-7240 Commit: 6419900ac24a5493827abf9b5d90373bc1043e0b Parents: c09dc25 Author: Jing Zhao <ji...@apache.org> Authored: Mon Sep 28 14:40:27 2015 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Mon Sep 28 14:40:27 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/protocol/DatanodeID.java | 2 + .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 62 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 603 ++++++++++++++----- .../org/apache/hadoop/hdfs/DataStreamer.java | 212 +++---- .../apache/hadoop/hdfs/StripedDataStreamer.java | 342 +++-------- .../BlockUnderConstructionFeature.java | 30 +- .../server/blockmanagement/DatanodeManager.java | 4 + .../hadoop/hdfs/util/StripedBlockUtil.java | 23 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 31 +- .../apache/hadoop/hdfs/StripedFileTestUtil.java | 213 ++++++- .../hadoop/hdfs/TestDFSStripedOutputStream.java | 144 +---- .../TestDFSStripedOutputStreamWithFailure.java | 300 ++++----- .../hdfs/TestWriteStripedFileWithFailure.java | 8 + .../hdfs/server/balancer/TestBalancer.java | 4 +- .../hadoop/hdfs/server/mover/TestMover.java | 4 +- .../TestAddOverReplicatedStripedBlocks.java | 12 +- .../namenode/ha/TestRetryCacheWithHA.java | 9 +- 18 files changed, 1068 insertions(+), 938 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 6d72285..c709cbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -38,6 +38,8 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public class DatanodeID implements Comparable<DatanodeID> { public static final DatanodeID[] EMPTY_ARRAY = {}; + public static final DatanodeID EMPTY_DATANODE_ID = new DatanodeID("null", + "null", "null", 0, 0, 0, 0); private String ipAddr; // IP address private String hostName; // hostname claimed by datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index d62dbac..6a01d61 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -450,3 +450,6 @@ HDFS-8882. Erasure Coding: Use datablocks, parityblocks and cell size from ErasureCodingPolicy (Vinayakumar B via zhz) + + HDFS-9040. Erasure coding: coordinate data streamers in + DFSStripedOutputStream. (jing9 and Walter Su) http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4923c86..e77a00a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.util.ByteArrayManager; @@ -212,14 +213,17 @@ public class DFSOutputStream extends FSOutputSummer /** Construct a new output stream for creating a file. */ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, - DataChecksum checksum, String[] favoredNodes) throws IOException { + DataChecksum checksum, String[] favoredNodes, boolean createStreamer) + throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); - streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager, favoredNodes); + if (createStreamer) { + streamer = new DataStreamer(stat, null, dfsClient, src, progress, + checksum, cachingStrategy, byteArrayManager, favoredNodes); + } } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, @@ -276,7 +280,7 @@ public class DFSOutputStream extends FSOutputSummer flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); + flag, progress, checksum, favoredNodes, true); } out.start(); return out; @@ -476,7 +480,7 @@ public class DFSOutputStream extends FSOutputSummer * * @throws IOException */ - protected void endBlock() throws IOException { + void endBlock() throws IOException { if (getStreamer().getBytesCurBlock() == blockSize) { setCurrentPacketToEmpty(); enqueueCurrentPacket(); @@ -921,4 +925,52 @@ public class DFSOutputStream extends FSOutputSummer public String toString() { return getClass().getSimpleName() + ":" + streamer; } + + static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, + String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes) + throws IOException { + final DfsClientConf conf = dfsClient.getConf(); + int retries = conf.getNumBlockWriteLocateFollowingRetry(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); + long localstart = Time.monotonicNow(); + while (true) { + try { + return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, + excludedNodes, fileId, favoredNodes); + } catch (RemoteException e) { + IOException ue = e.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnresolvedPathException.class); + if (ue != e) { + throw ue; // no need to retry these exceptions + } + if (NotReplicatedYetException.class.getName().equals(e.getClassName())) { + if (retries == 0) { + throw e; + } else { + --retries; + LOG.info("Exception while adding a block", e); + long elapsed = Time.monotonicNow() - localstart; + if (elapsed > 5000) { + LOG.info("Waiting for replication for " + (elapsed / 1000) + + " seconds"); + } + try { + LOG.warn("NotReplicatedYetException sleeping " + src + + " retries left " + retries); + Thread.sleep(sleeptime); + sleeptime *= 2; + } catch (InterruptedException ie) { + LOG.warn("Caught exception", ie); + } + } + } else { + throw e; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index d3a054a..c145a2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -25,23 +25,34 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -59,23 +70,11 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final List<BlockingQueue<T>> queues; MultipleBlockingQueue(int numQueue, int queueSize) { - queues = new ArrayList<>(numQueue); + List<BlockingQueue<T>> list = new ArrayList<>(numQueue); for (int i = 0; i < numQueue; i++) { - queues.add(new LinkedBlockingQueue<T>(queueSize)); + list.add(new LinkedBlockingQueue<T>(queueSize)); } - } - - boolean isEmpty() { - for(int i = 0; i < queues.size(); i++) { - if (!queues.get(i).isEmpty()) { - return false; - } - } - return true; - } - - int numQueues() { - return queues.size(); + queues = Collections.synchronizedList(list); } void offer(int i, T object) { @@ -92,6 +91,14 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } + T takeWithTimeout(int i) throws InterruptedIOException { + try { + return queues.get(i).poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, e); + } + } + T poll(int i) { return queues.get(i).poll(); } @@ -99,23 +106,44 @@ public class DFSStripedOutputStream extends DFSOutputStream { T peek(int i) { return queues.get(i).peek(); } + + void clear() { + for (BlockingQueue<T> q : queues) { + q.clear(); + } + } } /** Coordinate the communication between the streamers. */ - class Coordinator { + static class Coordinator { + /** + * The next internal block to write to for each streamers. The + * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to + * get a new block group. The block group is split to internal blocks, which + * are then distributed into the queue for streamers to retrieve. + */ private final MultipleBlockingQueue<LocatedBlock> followingBlocks; + /** + * Used to sync among all the streamers before allocating a new block. The + * DFSStripedOutputStream uses this to make sure every streamer has finished + * writing the previous block. + */ private final MultipleBlockingQueue<ExtendedBlock> endBlocks; + /** + * The following data structures are used for syncing while handling errors + */ private final MultipleBlockingQueue<LocatedBlock> newBlocks; - private final MultipleBlockingQueue<ExtendedBlock> updateBlocks; + private final Map<StripedDataStreamer, Boolean> updateStreamerMap; + private final MultipleBlockingQueue<Boolean> streamerUpdateResult; - Coordinator(final DfsClientConf conf, final int numDataBlocks, - final int numAllBlocks) { + Coordinator(final int numAllBlocks) { followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1); - + endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + updateStreamerMap = Collections.synchronizedMap( + new HashMap<StripedDataStreamer, Boolean>(numAllBlocks)); + streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); } MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() { @@ -126,68 +154,28 @@ public class DFSStripedOutputStream extends DFSOutputStream { return newBlocks; } - MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() { - return updateBlocks; - } - - StripedDataStreamer getStripedDataStreamer(int i) { - return DFSStripedOutputStream.this.getStripedDataStreamer(i); - } - void offerEndBlock(int i, ExtendedBlock block) { endBlocks.offer(i, block); } - ExtendedBlock takeEndBlock(int i) throws InterruptedIOException { - return endBlocks.take(i); + void offerStreamerUpdateResult(int i, boolean success) { + streamerUpdateResult.offer(i, success); } - boolean hasAllEndBlocks() { - for(int i = 0; i < endBlocks.numQueues(); i++) { - if (endBlocks.peek(i) == null) { - return false; - } - } - return true; + boolean takeStreamerUpdateResult(int i) throws InterruptedIOException { + return streamerUpdateResult.take(i); } - void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { - ExtendedBlock b = endBlocks.peek(i); - if (b == null) { - // streamer just has failed, put end block and continue - b = block; - offerEndBlock(i, b); - } - b.setNumBytes(newBytes); + void updateStreamer(StripedDataStreamer streamer, + boolean success) { + assert !updateStreamerMap.containsKey(streamer); + updateStreamerMap.put(streamer, success); } - /** @return a block representing the entire block group. */ - ExtendedBlock getBlockGroup() { - final StripedDataStreamer s0 = getStripedDataStreamer(0); - final ExtendedBlock b0 = s0.getBlock(); - if (b0 == null) { - return null; - } - - final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; - - final ExtendedBlock block = new ExtendedBlock(b0); - long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock(); - for (int i = 1; i < numAllBlocks; i++) { - final StripedDataStreamer si = getStripedDataStreamer(i); - final ExtendedBlock bi = si.getBlock(); - if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(bi.getGenerationStamp()); - } - if (i < numDataBlocks) { - numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); - } - } - block.setNumBytes(numBytes); - if (LOG.isDebugEnabled()) { - LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes()); - } - return block; + void clearFailureStates() { + newBlocks.clear(); + updateStreamerMap.clear(); + streamerUpdateResult.clear(); } } @@ -263,18 +251,16 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final int cellSize; private final int numAllBlocks; private final int numDataBlocks; - - @Override - ExtendedBlock getBlock() { - return coordinator.getBlockGroup(); - } + private ExtendedBlock currentBlockGroup; + private final String[] favoredNodes; + private final List<StripedDataStreamer> failedStreamers; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { - super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); + super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false); if (LOG.isDebugEnabled()) { LOG.debug("Creating DFSStripedOutputStream for " + src); } @@ -284,12 +270,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellSize = ecPolicy.getCellSize(); numDataBlocks = ecPolicy.getNumDataUnits(); numAllBlocks = numDataBlocks + numParityBlocks; + this.favoredNodes = favoredNodes; + failedStreamers = new ArrayList<>(); encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), numDataBlocks, numParityBlocks); - coordinator = new Coordinator(dfsClient.getConf(), - numDataBlocks, numAllBlocks); + coordinator = new Coordinator(numAllBlocks); try { cellBuffers = new CellBuffers(numParityBlocks); } catch (InterruptedException ie) { @@ -297,14 +284,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { "Failed to create cell buffers", ie); } - List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks); + streamers = new ArrayList<>(numAllBlocks); for (short i = 0; i < numAllBlocks; i++) { StripedDataStreamer streamer = new StripedDataStreamer(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, i, coordinator); - s.add(streamer); + streamers.add(streamer); } - streamers = Collections.unmodifiableList(s); currentPackets = new DFSPacket[streamers.size()]; setCurrentStreamer(0); } @@ -318,17 +304,19 @@ public class DFSStripedOutputStream extends DFSOutputStream { } private synchronized StripedDataStreamer getCurrentStreamer() { - return (StripedDataStreamer)streamer; + return (StripedDataStreamer) streamer; } private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) { // backup currentPacket for current streamer - int oldIdx = streamers.indexOf(streamer); - if (oldIdx >= 0) { - currentPackets[oldIdx] = currentPacket; + if (streamer != null) { + int oldIdx = streamers.indexOf(getCurrentStreamer()); + if (oldIdx >= 0) { + currentPackets[oldIdx] = currentPacket; + } } - streamer = streamers.get(newIdx); + streamer = getStripedDataStreamer(newIdx); currentPacket = currentPackets[newIdx]; adjustChunkBoundary(); @@ -350,40 +338,127 @@ public class DFSStripedOutputStream extends DFSOutputStream { encoder.encode(dataBuffers, parityBuffers); } - - private void checkStreamers(boolean setExternalError) throws IOException { - int count = 0; + /** + * check all the existing StripedDataStreamer and find newly failed streamers. + * @return The newly failed streamers. + * @throws IOException if less than {@link #numDataBlocks} streamers are still + * healthy. + */ + private Set<StripedDataStreamer> checkStreamers() throws IOException { + Set<StripedDataStreamer> newFailed = new HashSet<>(); for(StripedDataStreamer s : streamers) { - if (!s.isFailed()) { - if (setExternalError && s.getBlock() != null) { - s.getErrorState().initExternalError(); - } - count++; + if (!s.isHealthy() && !failedStreamers.contains(s)) { + newFailed.add(s); } } + + final int failCount = failedStreamers.size() + newFailed.size(); if (LOG.isDebugEnabled()) { LOG.debug("checkStreamers: " + streamers); - LOG.debug("count=" + count); + LOG.debug("healthy streamer count=" + (numAllBlocks - failCount)); + LOG.debug("original failed streamers: " + failedStreamers); + LOG.debug("newly failed streamers: " + newFailed); } - if (count < numDataBlocks) { - throw new IOException("Failed: the number of remaining blocks = " - + count + " < the number of data blocks = " + numDataBlocks); + if (failCount > (numAllBlocks - numDataBlocks)) { + throw new IOException("Failed: the number of failed blocks = " + + failCount + " > the number of data blocks = " + + (numAllBlocks - numDataBlocks)); } + return newFailed; } private void handleStreamerFailure(String err, Exception e) throws IOException { - handleStreamerFailure(err, e, true); - } - - private void handleStreamerFailure(String err, Exception e, - boolean setExternalError) throws IOException { LOG.warn("Failed: " + err + ", " + this, e); - getCurrentStreamer().setFailed(true); - checkStreamers(setExternalError); + getCurrentStreamer().getErrorState().setInternalError(); + getCurrentStreamer().close(true); + checkStreamers(); currentPacket = null; } + private void replaceFailedStreamers() { + assert streamers.size() == numAllBlocks; + for (short i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer oldStreamer = getStripedDataStreamer(i); + if (!oldStreamer.isHealthy()) { + StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat, + dfsClient, src, oldStreamer.progress, + oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager, + favoredNodes, i, coordinator); + streamers.set(i, streamer); + currentPackets[i] = null; + if (i == 0) { + this.streamer = streamer; + } + streamer.start(); + } + } + } + + private void waitEndBlocks(int i) throws IOException { + while (getStripedDataStreamer(i).isHealthy()) { + final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i); + if (b != null) { + StripedBlockUtil.checkBlocks(currentBlockGroup, i, b); + return; + } + } + } + + private void allocateNewBlock() throws IOException { + if (currentBlockGroup != null) { + for (int i = 0; i < numAllBlocks; i++) { + // sync all the healthy streamers before writing to the new block + waitEndBlocks(i); + } + } + failedStreamers.clear(); + // replace failed streamers + replaceFailedStreamers(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Allocating new block group. The previous block group: " + + currentBlockGroup); + } + + // TODO collect excludedNodes from all the data streamers + final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup, + fileId, favoredNodes); + assert lb.isStriped(); + if (lb.getLocations().length < numDataBlocks) { + throw new IOException("Failed to get " + numDataBlocks + + " nodes from namenode: blockGroupSize= " + numAllBlocks + + ", blocks.length= " + lb.getLocations().length); + } + // assign the new block to the current block group + currentBlockGroup = lb.getBlock(); + + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) lb, cellSize, numDataBlocks, + numAllBlocks - numDataBlocks); + for (int i = 0; i < blocks.length; i++) { + StripedDataStreamer si = getStripedDataStreamer(i); + if (si.isHealthy()) { // skipping failed data streamer + if (blocks[i] == null) { + // Set exception and close streamer as there is no block locations + // found for the parity block. + LOG.warn("Failed to get block location for parity block, index=" + i); + si.getLastException().set( + new IOException("Failed to get following block, i=" + i)); + si.getErrorState().setInternalError(); + si.close(true); + } else { + coordinator.getFollowingBlocks().offer(i, blocks[i]); + } + } + } + } + + private boolean shouldEndBlockGroup() { + return currentBlockGroup != null && + currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; + } + @Override protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { @@ -392,8 +467,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { final int pos = cellBuffers.addTo(index, bytes, offset, len); final boolean cellFull = pos == cellSize; - final long oldBytes = current.getBytesCurBlock(); - if (!current.isFailed()) { + if (currentBlockGroup == null || shouldEndBlockGroup()) { + // the incoming data should belong to a new block. Allocate a new block. + allocateNewBlock(); + } + + currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len); + if (current.isHealthy()) { try { super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); } catch(Exception e) { @@ -401,12 +481,6 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - if (current.isFailed()) { - final long newBytes = oldBytes + len; - coordinator.setBytesEndBlock(index, newBytes, current.getBlock()); - current.setBytesCurBlock(newBytes); - } - // Two extra steps are needed when a striping cell is full: // 1. Forward the current index pointer // 2. Generate parity packets if a full stripe of data cells are present @@ -419,11 +493,209 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellBuffers.flipDataBuffers(); writeParityCells(); next = 0; + // check failure state for all the streamers. Bump GS if necessary + checkStreamerFailures(); + + // if this is the end of the block group, end each internal block + if (shouldEndBlockGroup()) { + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (s.isHealthy()) { + try { + endBlock(); + } catch (IOException ignored) {} + } + } + } } setCurrentStreamer(next); } } + @Override + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + + " appendChunk={}, {}", currentPacket, src, getStreamer() + .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + // no need to end block here + } + + private Set<StripedDataStreamer> markExternalErrorOnStreamers() { + Set<StripedDataStreamer> healthySet = new HashSet<>(); + for (StripedDataStreamer streamer : streamers) { + if (streamer.isHealthy() && + streamer.getStage() == BlockConstructionStage.DATA_STREAMING) { + streamer.setExternalError(); + healthySet.add(streamer); + } + } + return healthySet; + } + + /** + * Check and handle data streamer failures. This is called only when we have + * written a full stripe (i.e., enqueue all packets for a full stripe), or + * when we're closing the outputstream. + */ + private void checkStreamerFailures() throws IOException { + Set<StripedDataStreamer> newFailed = checkStreamers(); + if (newFailed.size() > 0) { + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + } + // get all the current failed streamers after the flush + newFailed = checkStreamers(); + while (newFailed.size() > 0) { + failedStreamers.addAll(newFailed); + coordinator.clearFailureStates(); + + // mark all the healthy streamers as external error + Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers(); + + // we have newly failed streamers, update block for pipeline + final ExtendedBlock newBG = updateBlockForPipeline(healthySet); + + // wait till all the healthy streamers to + // 1) get the updated block info + // 2) create new block outputstream + newFailed = waitCreatingNewStreams(healthySet); + if (newFailed.size() + failedStreamers.size() > + numAllBlocks - numDataBlocks) { + throw new IOException( + "Data streamers failed while creating new block streams: " + + newFailed + ". There are not enough healthy streamers."); + } + for (StripedDataStreamer failedStreamer : newFailed) { + assert !failedStreamer.isHealthy(); + } + + // TODO we can also succeed if all the failed streamers have not taken + // the updated block + if (newFailed.size() == 0) { + // reset external error state of all the streamers + for (StripedDataStreamer streamer : healthySet) { + assert streamer.isHealthy(); + streamer.getErrorState().reset(); + } + updatePipeline(newBG); + } + for (int i = 0; i < numAllBlocks; i++) { + coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); + } + } + } + + private int checkStreamerUpdates(Set<StripedDataStreamer> failed, + Set<StripedDataStreamer> streamers) { + for (StripedDataStreamer streamer : streamers) { + if (!coordinator.updateStreamerMap.containsKey(streamer)) { + if (!streamer.isHealthy() && + coordinator.getNewBlocks().peek(streamer.getIndex()) != null) { + // this streamer had internal error before getting updated block + failed.add(streamer); + } + } + } + return coordinator.updateStreamerMap.size() + failed.size(); + } + + private Set<StripedDataStreamer> waitCreatingNewStreams( + Set<StripedDataStreamer> healthyStreamers) throws IOException { + Set<StripedDataStreamer> failed = new HashSet<>(); + final int expectedNum = healthyStreamers.size(); + final long socketTimeout = dfsClient.getConf().getSocketTimeout(); + // the total wait time should be less than the socket timeout, otherwise + // a slow streamer may cause other streamers to timeout. here we wait for + // half of the socket timeout + long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE; + final long waitInterval = 1000; + synchronized (coordinator) { + while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum + && remaingTime > 0) { + try { + long start = Time.monotonicNow(); + coordinator.wait(waitInterval); + remaingTime -= Time.monotonicNow() - start; + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("Interrupted when waiting" + + " for results of updating striped streamers", e); + } + } + } + synchronized (coordinator) { + for (StripedDataStreamer streamer : healthyStreamers) { + if (!coordinator.updateStreamerMap.containsKey(streamer)) { + // close the streamer if it is too slow to create new connection + streamer.setStreamerAsClosed(); + failed.add(streamer); + } + } + } + for (Map.Entry<StripedDataStreamer, Boolean> entry : + coordinator.updateStreamerMap.entrySet()) { + if (!entry.getValue()) { + failed.add(entry.getKey()); + } + } + for (StripedDataStreamer failedStreamer : failed) { + healthyStreamers.remove(failedStreamer); + } + return failed; + } + + /** + * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block + * to healthy streamers. + * @param healthyStreamers The healthy data streamers. These streamers join + * the failure handling. + */ + private ExtendedBlock updateBlockForPipeline( + Set<StripedDataStreamer> healthyStreamers) throws IOException { + final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline( + currentBlockGroup, dfsClient.clientName); + final long newGS = updated.getBlock().getGenerationStamp(); + ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup); + newBlock.setGenerationStamp(newGS); + final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) updated, cellSize, numDataBlocks, + numAllBlocks - numDataBlocks); + + for (int i = 0; i < numAllBlocks; i++) { + StripedDataStreamer si = getStripedDataStreamer(i); + if (healthyStreamers.contains(si)) { + final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock), + null, null, null, -1, updated.isCorrupt(), null); + lb.setBlockToken(updatedBlks[i].getBlockToken()); + coordinator.getNewBlocks().offer(i, lb); + } + } + return newBlock; + } + + private void updatePipeline(ExtendedBlock newBG) throws IOException { + final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks]; + final String[] newStorageIDs = new String[numAllBlocks]; + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer streamer = getStripedDataStreamer(i); + final DatanodeInfo[] nodes = streamer.getNodes(); + final String[] storageIDs = streamer.getStorageIDs(); + if (streamer.isHealthy() && nodes != null && storageIDs != null) { + newNodes[i] = nodes[0]; + newStorageIDs[i] = storageIDs[0]; + } else { + newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID); + newStorageIDs[i] = ""; + } + } + dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup, + newBG, newNodes, newStorageIDs); + currentBlockGroup = newBG; + } + private int stripeDataSize() { return numDataBlocks * cellSize; } @@ -500,28 +772,16 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - /** - * Simply add bytesCurBlock together. Note that this result is not accurately - * the size of the block group. - */ - private long getCurrentSumBytes() { - long sum = 0; - for (int i = 0; i < numDataBlocks; i++) { - sum += streamers.get(i).getBytesCurBlock(); - } - return sum; - } - private boolean generateParityCellsForLastStripe() { - final long currentBlockGroupBytes = getCurrentSumBytes(); - if (currentBlockGroupBytes % stripeDataSize() == 0) { + final long currentBlockGroupBytes = currentBlockGroup == null ? + 0 : currentBlockGroup.getNumBytes(); + final long lastStripeSize = currentBlockGroupBytes % stripeDataSize(); + if (lastStripeSize == 0) { return false; } - final int firstCellSize = - (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); - final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? - firstCellSize : cellSize; + final long parityCellSize = lastStripeSize < cellSize? + lastStripeSize : cellSize; final ByteBuffer[] buffers = cellBuffers.getBuffers(); for (int i = 0; i < numAllBlocks; i++) { @@ -550,13 +810,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellBuffers.clear(); } - void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf - ) throws IOException { + void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf) + throws IOException { final StripedDataStreamer current = setCurrentStreamer(index); final int len = buffer.limit(); final long oldBytes = current.getBytesCurBlock(); - if (!current.isFailed()) { + if (current.isHealthy()) { try { DataChecksum sum = getDataChecksum(); sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); @@ -570,18 +830,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); } } - - if (current.isFailed()) { - final long newBytes = oldBytes + len; - current.setBytesCurBlock(newBytes); - } } @Override void setClosed() { super.setClosed(); for (int i = 0; i < numAllBlocks; i++) { - streamers.get(i).release(); + getStripedDataStreamer(i).release(); } cellBuffers.release(); } @@ -607,37 +862,40 @@ public class DFSStripedOutputStream extends DFSOutputStream { try { // flush from all upper layers - try { - flushBuffer(); - } catch(Exception e) { - handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e); - } + flushBuffer(); // if the last stripe is incomplete, generate and write parity cells if (generateParityCellsForLastStripe()) { writeParityCells(); } enqueueAllCurrentPackets(); + // flush all the data packets + flushAllInternals(); + // check failures + checkStreamerFailures(); + for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); - if (!s.isFailed()) { + if (s.isHealthy()) { try { if (s.getBytesCurBlock() > 0) { setCurrentPacketToEmpty(); } - // flush all data to Datanode + // flush the last "close" packet to Datanode flushInternal(); } catch(Exception e) { - handleStreamerFailure("flushInternal " + s, e, false); + // TODO for both close and endBlock, we currently do not handle + // failures when sending the last packet. We actually do not need to + // bump GS for this kind of failure. Thus counting the total number + // of failures may be good enough. } } } closeThreads(false); - final ExtendedBlock lastBlock = coordinator.getBlockGroup(); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { - completeFile(lastBlock); + completeFile(currentBlockGroup); } finally { scope.close(); } @@ -652,14 +910,45 @@ public class DFSStripedOutputStream extends DFSOutputStream { int idx = streamers.indexOf(getCurrentStreamer()); for(int i = 0; i < streamers.size(); i++) { final StripedDataStreamer si = setCurrentStreamer(i); - if (!si.isFailed() && currentPacket != null) { + if (si.isHealthy() && currentPacket != null) { try { enqueueCurrentPacket(); } catch (IOException e) { - handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false); + handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e); } } } setCurrentStreamer(idx); } + + void flushAllInternals() throws IOException { + int current = getCurrentIndex(); + + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (s.isHealthy()) { + try { + // flush all data to Datanode + flushInternal(); + } catch(Exception e) { + handleStreamerFailure("flushInternal " + s, e); + } + } + } + setCurrentStreamer(current); + } + + static void sleep(long ms, String op) throws InterruptedIOException { + try { + Thread.sleep(ms); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Sleep interrupted during " + op, ie); + } + } + + @Override + ExtendedBlock getBlock() { + return currentBlockGroup; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index c478f1c..a6eb01f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SU import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; @@ -46,16 +45,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; @@ -69,13 +64,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; @@ -204,9 +196,12 @@ class DataStreamer extends Daemon { } } + enum ErrorType { + NONE, INTERNAL, EXTERNAL + } + static class ErrorState { - private boolean error = false; - private boolean externalError = false; + ErrorType error = ErrorType.NONE; private int badNodeIndex = -1; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; @@ -216,35 +211,47 @@ class DataStreamer extends Daemon { this.datanodeRestartTimeout = datanodeRestartTimeout; } + synchronized void resetInternalError() { + if (hasInternalError()) { + error = ErrorType.NONE; + } + badNodeIndex = -1; + restartingNodeIndex = -1; + restartingNodeDeadline = 0; + } + synchronized void reset() { - error = false; - externalError = false; + error = ErrorType.NONE; badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; } - synchronized boolean hasError() { - return error; + synchronized boolean hasInternalError() { + return error == ErrorType.INTERNAL; } - synchronized boolean hasExternalErrorOnly() { - return error && externalError && !isNodeMarked(); + synchronized boolean hasExternalError() { + return error == ErrorType.EXTERNAL; } - synchronized boolean hasDatanodeError() { - return error && (isNodeMarked() || externalError); + synchronized boolean hasError() { + return error != ErrorType.NONE; } - synchronized void setError(boolean err) { - this.error = err; + synchronized boolean hasDatanodeError() { + return error == ErrorType.INTERNAL && isNodeMarked(); } - synchronized void initExternalError() { - setError(true); - this.externalError = true; + synchronized void setInternalError() { + this.error = ErrorType.INTERNAL; } + synchronized void setExternalError() { + if (!hasInternalError()) { + this.error = ErrorType.EXTERNAL; + } + } synchronized void setBadNodeIndex(int index) { this.badNodeIndex = index; @@ -306,14 +313,14 @@ class DataStreamer extends Daemon { } if (!isRestartingNode()) { - error = false; + error = ErrorType.NONE; } badNodeIndex = -1; } synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { if (restartingNodeIndex >= 0) { - if (!error) { + if (error == ErrorType.NONE) { throw new IllegalStateException("error=false while checking" + " restarting node deadline"); } @@ -345,7 +352,7 @@ class DataStreamer extends Daemon { private volatile boolean streamerClosed = false; protected ExtendedBlock block; // its length is number of bytes acked - private Token<BlockTokenIdentifier> accessToken; + protected Token<BlockTokenIdentifier> accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; private ResponseProcessor response = null; @@ -355,7 +362,7 @@ class DataStreamer extends Daemon { private final ErrorState errorState; private BlockConstructionStage stage; // block construction stage - private long bytesSent = 0; // number of bytes that've been sent + protected long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; /** Nodes have been used in the pipeline before and have failed. */ @@ -378,13 +385,13 @@ class DataStreamer extends Daemon { protected final DFSClient dfsClient; protected final String src; /** Only for DataTransferProtocol.writeBlock(..) */ - private final DataChecksum checksum4WriteBlock; - private final Progressable progress; + final DataChecksum checksum4WriteBlock; + final Progressable progress; protected final HdfsFileStatus stat; // appending to existing partial block private volatile boolean appendChunk = false; // both dataQueue and ackQueue are protected by dataQueue lock - private final LinkedList<DFSPacket> dataQueue = new LinkedList<>(); + protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>(); private final LinkedList<DFSPacket> ackQueue = new LinkedList<>(); private final AtomicReference<CachingStrategy> cachingStrategy; private final ByteArrayManager byteArrayManager; @@ -401,7 +408,7 @@ class DataStreamer extends Daemon { CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; private int lastCongestionBackoffTime; - private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; + protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; private final String[] favoredNodes; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, @@ -473,6 +480,10 @@ class DataStreamer extends Daemon { } } + void setAccessToken(Token<BlockTokenIdentifier> t) { + this.accessToken = t; + } + private void setPipeline(LocatedBlock lb) { setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } @@ -533,7 +544,7 @@ class DataStreamer extends Daemon { DFSPacket one; try { // process datanode IO errors if any - boolean doSleep = processDatanodeError(); + boolean doSleep = processDatanodeOrExternalError(); final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { @@ -696,7 +707,7 @@ class DataStreamer extends Daemon { } lastException.set(e); assert !(e instanceof NullPointerException); - errorState.setError(true); + errorState.setInternalError(); if (!errorState.isNodeMarked()) { // Not a datanode issue streamerClosed = true; @@ -837,6 +848,9 @@ class DataStreamer extends Daemon { } } + void setStreamerAsClosed() { + streamerClosed = true; + } private void checkClosed() throws IOException { if (streamerClosed) { @@ -857,7 +871,7 @@ class DataStreamer extends Daemon { } } - private void closeStream() { + void closeStream() { final MultipleIOException.Builder b = new MultipleIOException.Builder(); if (blockStream != null) { @@ -1037,7 +1051,7 @@ class DataStreamer extends Daemon { } catch (Exception e) { if (!responderClosed) { lastException.set(e); - errorState.setError(true); + errorState.setInternalError(); errorState.markFirstNodeIfNotMarked(); synchronized (dataQueue) { dataQueue.notifyAll(); @@ -1059,18 +1073,18 @@ class DataStreamer extends Daemon { } } + private boolean shouldHandleExternalError(){ + return errorState.hasExternalError() && blockStream != null; + } + /** * If this stream has encountered any errors, shutdown threads * and mark the stream as closed. * * @return true if it should sleep for a while after returning. */ - private boolean processDatanodeError() throws IOException { - if (!errorState.hasDatanodeError()) { - return false; - } - if (errorState.hasExternalErrorOnly() && block == null) { - // block is not yet initialized, handle external error later. + private boolean processDatanodeOrExternalError() throws IOException { + if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) { return false; } if (response != null) { @@ -1103,7 +1117,8 @@ class DataStreamer extends Daemon { return false; } } - boolean doSleep = setupPipelineForAppendOrRecovery(); + + setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { @@ -1135,7 +1150,7 @@ class DataStreamer extends Daemon { } } - return doSleep; + return false; } void setHflush() { @@ -1266,7 +1281,7 @@ class DataStreamer extends Daemon { * This happens when a file is appended or data streaming fails * It keeps on trying until a pipeline is setup */ - private boolean setupPipelineForAppendOrRecovery() throws IOException { + private void setupPipelineForAppendOrRecovery() throws IOException { // check number of datanodes if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" @@ -1274,19 +1289,23 @@ class DataStreamer extends Daemon { LOG.warn(msg); lastException.set(new IOException(msg)); streamerClosed = true; - return false; + return; } + setupPipelineInternal(nodes, storageTypes); + } + protected void setupPipelineInternal(DatanodeInfo[] datanodes, + StorageType[] nodeStorageTypes) throws IOException { boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { - return false; + return; } - final boolean isRecovery = errorState.hasError(); + final boolean isRecovery = errorState.hasInternalError(); if (!handleBadDatanode()) { - return false; + return; } handleDatanodeReplacement(); @@ -1307,7 +1326,6 @@ class DataStreamer extends Daemon { if (success) { block = updatePipeline(newGS); } - return false; // do not sleep, continue processing } /** @@ -1315,7 +1333,7 @@ class DataStreamer extends Daemon { * This process is repeated until the deadline or the node starts back up. * @return true if it should continue. */ - private boolean handleRestartingDatanode() { + boolean handleRestartingDatanode() { if (errorState.isRestartingNode()) { // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this @@ -1338,7 +1356,7 @@ class DataStreamer extends Daemon { * Remove bad node from list of nodes if badNodeIndex was set. * @return true if it should continue. */ - private boolean handleBadDatanode() { + boolean handleBadDatanode() { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0) { if (nodes.length <= 1) { @@ -1388,7 +1406,7 @@ class DataStreamer extends Daemon { } } - private void failPacket4Testing() { + void failPacket4Testing() { if (failPacket) { // for testing failPacket = false; try { @@ -1400,13 +1418,8 @@ class DataStreamer extends Daemon { } } - LocatedBlock updateBlockForPipeline() throws IOException { - return callUpdateBlockForPipeline(block); - } - - LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException { - return dfsClient.namenode.updateBlockForPipeline( - newBlock, dfsClient.clientName); + private LocatedBlock updateBlockForPipeline() throws IOException { + return dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); } static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { @@ -1417,18 +1430,12 @@ class DataStreamer extends Daemon { /** update pipeline at the namenode */ ExtendedBlock updatePipeline(long newGS) throws IOException { final ExtendedBlock newBlock = newBlock(block, newGS); - return callUpdatePipeline(block, newBlock, nodes, storageIDs); - } - - ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock, - DatanodeInfo[] newNodes, String[] newStorageIDs) - throws IOException { - dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, - newNodes, newStorageIDs); + dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + nodes, storageIDs); return newBlock; } - int getNumBlockWriteRetry() { + private int getNumBlockWriteRetry() { return dfsClient.getConf().getNumBlockWriteRetry(); } @@ -1438,7 +1445,7 @@ class DataStreamer extends Daemon { * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - private LocatedBlock nextBlockOutputStream() throws IOException { + protected LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; @@ -1446,9 +1453,8 @@ class DataStreamer extends Daemon { boolean success = false; ExtendedBlock oldBlock = block; do { - errorState.reset(); + errorState.resetInternalError(); lastException.clear(); - success = false; DatanodeInfo[] excluded = excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) @@ -1488,7 +1494,7 @@ class DataStreamer extends Daemon { // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, + boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { LOG.info("nodes are empty for write pipeline of " + block); @@ -1567,7 +1573,7 @@ class DataStreamer extends Daemon { assert null == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success - errorState.reset(); + errorState.resetInternalError(); } catch (IOException ie) { if (!errorState.isRestartingNode()) { LOG.info("Exception in createBlockOutputStream " + this, ie); @@ -1603,7 +1609,7 @@ class DataStreamer extends Daemon { if (checkRestart && shouldWaitForRestart(i)) { errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]); } - errorState.setError(true); + errorState.setInternalError(); lastException.set(ie); result = false; // error } finally { @@ -1645,58 +1651,10 @@ class DataStreamer extends Daemon { } } - LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { - final DfsClientConf conf = dfsClient.getConf(); - int retries = conf.getNumBlockWriteLocateFollowingRetry(); - long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); - while (true) { - long localstart = Time.monotonicNow(); - while (true) { - try { - return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes); - } catch (RemoteException e) { - IOException ue = - e.unwrapRemoteException(FileNotFoundException.class, - AccessControlException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class); - if (ue != e) { - throw ue; // no need to retry these exceptions - } - - - if (NotReplicatedYetException.class.getName(). - equals(e.getClassName())) { - if (retries == 0) { - throw e; - } else { - --retries; - LOG.info("Exception while adding a block", e); - long elapsed = Time.monotonicNow() - localstart; - if (elapsed > 5000) { - LOG.info("Waiting for replication for " - + (elapsed / 1000) + " seconds"); - } - try { - LOG.warn("NotReplicatedYetException sleeping " + src - + " retries left " + retries); - Thread.sleep(sleeptime); - sleeptime *= 2; - } catch (InterruptedException ie) { - LOG.warn("Caught exception", ie); - } - } - } else { - throw e; - } - - } - } - } + return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block, + stat.getFileId(), favoredNodes); } /** @@ -1755,6 +1713,10 @@ class DataStreamer extends Daemon { return storageIDs; } + BlockConstructionStage getStage() { + return stage; + } + /** * return the token of the block * http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 2f83f7c..a313ecb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -19,18 +19,15 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; -import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.ByteArrayManager; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -46,66 +43,8 @@ import com.google.common.annotations.VisibleForTesting; * other streamers. */ public class StripedDataStreamer extends DataStreamer { - /** - * This class is designed for multiple threads to share a - * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest - * thread calling poll populates entries to the queue and the other threads - * will wait for it. Once the entries are populated, all the threads can poll - * their entries. - * - * @param <T> the queue entry type. - */ - static abstract class ConcurrentPoll<T> { - final MultipleBlockingQueue<T> queue; - - ConcurrentPoll(MultipleBlockingQueue<T> queue) { - this.queue = queue; - } - - T poll(final int i) throws IOException { - for(;;) { - synchronized(queue) { - final T polled = queue.poll(i); - if (polled != null) { // already populated; return polled item. - return polled; - } - if (isReady2Populate()) { - try { - populate(); - return queue.poll(i); - } catch(IOException ioe) { - LOG.warn("Failed to populate, " + this, ioe); - throw ioe; - } - } - } - - // sleep and then retry. - sleep(100, "poll"); - } - } - - boolean isReady2Populate() { - return queue.isEmpty(); - } - - abstract void populate() throws IOException; - } - - private static void sleep(long ms, String op) throws InterruptedIOException { - try { - Thread.sleep(ms); - } catch(InterruptedException ie) { - throw DFSUtil.toInterruptedIOException( - "Sleep interrupted during " + op, ie); - } - } - private final Coordinator coordinator; private final int index; - private volatile boolean failed; - private final ECSchema schema; - private final int cellSize; StripedDataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, @@ -117,102 +56,59 @@ public class StripedDataStreamer extends DataStreamer { byteArrayManage, favoredNodes); this.index = index; this.coordinator = coordinator; - this.schema = stat.getErasureCodingPolicy().getSchema(); - this.cellSize = stat.getErasureCodingPolicy().getCellSize(); } int getIndex() { return index; } - void setFailed(boolean failed) { - this.failed = failed; - } - - boolean isFailed() { - return failed; - } - - private boolean isParityStreamer() { - return index >= schema.getNumDataUnits(); + boolean isHealthy() { + return !streamerClosed() && !getErrorState().hasInternalError(); } @Override protected void endBlock() { - if (!isParityStreamer()) { - coordinator.offerEndBlock(index, block); - } + coordinator.offerEndBlock(index, block); super.endBlock(); } - @Override - int getNumBlockWriteRetry() { - return 0; + /** + * The upper level DFSStripedOutputStream will allocate the new block group. + * All the striped data streamer only needs to fetch from the queue, which + * should be already be ready. + */ + private LocatedBlock getFollowingBlock() throws IOException { + if (!this.isHealthy()) { + // No internal block for this streamer, maybe no enough healthy DN. + // Throw the exception which has been set by the StripedOutputStream. + this.getLastException().check(false); + } + return coordinator.getFollowingBlocks().poll(index); } @Override - LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) - throws IOException { - return new ConcurrentPoll<LocatedBlock>(coordinator.getFollowingBlocks()) { - @Override - boolean isReady2Populate() { - return super.isReady2Populate() - && (block == null || coordinator.hasAllEndBlocks()); - } - - @Override - void populate() throws IOException { - getLastException().check(false); - - if (block != null) { - // set numByte for the previous block group - long bytes = 0; - for (int i = 0; i < schema.getNumDataUnits(); i++) { - final ExtendedBlock b = coordinator.takeEndBlock(i); - StripedBlockUtil.checkBlocks(index, block, i, b); - bytes += b.getNumBytes(); - } - block.setNumBytes(bytes); - block.setBlockId(block.getBlockId() - index); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block); - } - - final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( - excludedNodes); - if (lb.getLocations().length < schema.getNumDataUnits()) { - throw new IOException( - "Failed to get datablocks number of nodes from namenode: blockGroupSize= " - + (schema.getNumDataUnits() + schema.getNumParityUnits()) - + ", blocks.length= " + lb.getLocations().length); - } - final LocatedBlock[] blocks = - StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) lb, - cellSize, schema.getNumDataUnits(), schema.getNumParityUnits()); - - for (int i = 0; i < blocks.length; i++) { - StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - if (si.isFailed()) { - continue; // skipping failed data streamer - } - if (blocks[i] == null) { - // Set exception and close streamer as there is no block locations - // found for the parity block. - LOG.warn("Failed to get block location for parity block, index=" - + i); - si.getLastException().set( - new IOException("Failed to get following block, i=" + i)); - si.setFailed(true); - si.endBlock(); - si.close(true); - } else { - queue.offer(i, blocks[i]); - } - } - } - }.poll(index); + protected LocatedBlock nextBlockOutputStream() throws IOException { + boolean success; + LocatedBlock lb = getFollowingBlock(); + block = lb.getBlock(); + block.setNumBytes(0); + bytesSent = 0; + accessToken = lb.getBlockToken(); + + DatanodeInfo[] nodes = lb.getLocations(); + StorageType[] storageTypes = lb.getStorageTypes(); + + // Connect to the DataNode. If fail the internal error state will be set. + success = createBlockOutputStream(nodes, storageTypes, 0L, false); + + if (!success) { + block = null; + final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; + LOG.info("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); + throw new IOException("Unable to create new block."); + } + return lb; } @VisibleForTesting @@ -221,119 +117,71 @@ public class StripedDataStreamer extends DataStreamer { } @Override - LocatedBlock updateBlockForPipeline() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("updateBlockForPipeline(), " + this); - } - return new ConcurrentPoll<LocatedBlock>(coordinator.getNewBlocks()) { - @Override - void populate() throws IOException { - final ExtendedBlock bg = coordinator.getBlockGroup(); - final LocatedBlock updated = callUpdateBlockForPipeline(bg); - final long newGS = updated.getBlock().getGenerationStamp(); - final LocatedBlock[] updatedBlks = StripedBlockUtil - .parseStripedBlockGroup((LocatedStripedBlock) updated, cellSize, - schema.getNumDataUnits(), schema.getNumParityUnits()); - for (int i = 0; i < schema.getNumDataUnits() - + schema.getNumParityUnits(); i++) { - StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - if (si.isFailed()) { - continue; // skipping failed data streamer - } - final ExtendedBlock bi = si.getBlock(); - if (bi != null) { - final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), - null, null, null, -1, updated.isCorrupt(), null); - lb.setBlockToken(updatedBlks[i].getBlockToken()); - queue.offer(i, lb); - } else { - final MultipleBlockingQueue<LocatedBlock> followingBlocks - = coordinator.getFollowingBlocks(); - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - lb.getBlock().setGenerationStamp(newGS); - si.getErrorState().reset(); - continue; - } - } - - //streamer i just have polled the block, sleep and retry. - sleep(100, "updateBlockForPipeline, " + this); - i--; - } - } + protected void setupPipelineInternal(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes) throws IOException { + boolean success = false; + while (!success && !streamerClosed() && dfsClient.clientRunning) { + if (!handleRestartingDatanode()) { + return; + } + if (!handleBadDatanode()) { + // for striped streamer if it is datanode error then close the stream + // and return. no need to replace datanode + return; } - }.poll(index); - } - - @Override - ExtendedBlock updatePipeline(final long newGS) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("updatePipeline(newGS=" + newGS + "), " + this); - } - return new ConcurrentPoll<ExtendedBlock>(coordinator.getUpdateBlocks()) { - @Override - void populate() throws IOException { - final MultipleBlockingQueue<LocatedBlock> followingBlocks - = coordinator.getFollowingBlocks(); - final ExtendedBlock bg = coordinator.getBlockGroup(); - final ExtendedBlock newBG = newBlock(bg, newGS); - final int n = schema.getNumDataUnits() + schema.getNumParityUnits(); - final DatanodeInfo[] newNodes = new DatanodeInfo[n]; - final String[] newStorageIDs = new String[n]; - for (int i = 0; i < n; i++) { - final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - DatanodeInfo[] nodes = si.getNodes(); - String[] storageIDs = si.getStorageIDs(); - if (nodes == null || storageIDs == null) { - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - nodes = lb.getLocations(); - storageIDs = lb.getStorageIDs(); - } - } - } - if (nodes != null && storageIDs != null) { - newNodes[i] = nodes[0]; - newStorageIDs[i] = storageIDs[0]; - } else { - //streamer i just have polled the block, sleep and retry. - sleep(100, "updatePipeline, " + this); - i--; - } + // get a new generation stamp and an access token + final LocatedBlock lb = coordinator.getNewBlocks().take(index); + long newGS = lb.getBlock().getGenerationStamp(); + setAccessToken(lb.getBlockToken()); + + // set up the pipeline again with the remaining nodes. when a striped + // data streamer comes here, it must be in external error state. + assert getErrorState().hasExternalError(); + success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true); + + failPacket4Testing(); + getErrorState().checkRestartingNodeDeadline(nodes); + + // notify coordinator the result of createBlockOutputStream + synchronized (coordinator) { + if (!streamerClosed()) { + coordinator.updateStreamer(this, success); + coordinator.notify(); + } else { + success = false; } - final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes, - newStorageIDs); - - for (int i = 0; i < n; i++) { - final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - final ExtendedBlock bi = si.getBlock(); - if (bi != null) { - queue.offer(i, newBlock(bi, updated.getGenerationStamp())); - } else if (!si.isFailed()) { - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - lb.getBlock().setGenerationStamp(newGS); - si.getErrorState().reset(); - continue; - } - } + } - //streamer i just have polled the block, sleep and retry. - sleep(100, "updatePipeline, " + this); - i--; - } + if (success) { + // wait for results of other streamers + success = coordinator.takeStreamerUpdateResult(index); + if (success) { + // if all succeeded, update its block using the new GS + block = newBlock(block, newGS); + } else { + // otherwise close the block stream and restart the recovery process + closeStream(); } + } else { + // if fail, close the stream. The internal error state and last + // exception have already been set in createBlockOutputStream + // TODO: wait for restarting DataNodes during RollingUpgrade + closeStream(); + setStreamerAsClosed(); } - }.poll(index); + } // while + } + + void setExternalError() { + getErrorState().setExternalError(); + synchronized (dataQueue) { + dataQueue.notifyAll(); + } } @Override public String toString() { - return "#" + index + ": " + (failed? "failed, ": "") + super.toString(); + return "#" + index + ": " + (!isHealthy() ? "failed, ": "") + super.toString(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java index 0e92779..1d4cff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@ -68,16 +68,28 @@ public class BlockUnderConstructionFeature { /** Set expected locations */ public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets, boolean isStriped) { - int numLocations = targets == null ? 0 : targets.length; + if (targets == null) { + return; + } + int numLocations = 0; + for (DatanodeStorageInfo target : targets) { + if (target != null) { + numLocations++; + } + } + this.replicas = new ReplicaUnderConstruction[numLocations]; - for(int i = 0; i < numLocations; i++) { - // when creating a new striped block we simply sequentially assign block - // index to each storage - Block replicaBlock = isStriped ? - new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) : - block; - replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i], - ReplicaState.RBW); + int offset = 0; + for(int i = 0; i < targets.length; i++) { + if (targets[i] != null) { + // when creating a new striped block we simply sequentially assign block + // index to each storage + Block replicaBlock = isStriped ? + new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) : + block; + replicas[offset++] = new ReplicaUnderConstruction(replicaBlock, + targets[i], ReplicaState.RBW); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index b5b3b97..61c6386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -513,6 +513,10 @@ public class DatanodeManager { } final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length]; for(int i = 0; i < datanodeID.length; i++) { + if (datanodeID[i].equals(DatanodeID.EMPTY_DATANODE_ID)) { + storages[i] = null; + continue; + } final DatanodeDescriptor dd = getDatanode(datanodeID[i]); storages[i] = dd.getStorageInfo(storageIDs[i]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 5af3585..d49d39b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -925,22 +925,21 @@ public class StripedBlockUtil { /** * Check if the information such as IDs and generation stamps in block-i - * match block-j, where block-i and block-j are in the same group. + * match the block group. */ - public static void checkBlocks(int j, ExtendedBlock blockj, + public static void checkBlocks(ExtendedBlock blockGroup, int i, ExtendedBlock blocki) throws IOException { - - if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) { - throw new IOException("Block pool IDs mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) { + throw new IOException("Block pool IDs mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } - if (blocki.getBlockId() - i != blockj.getBlockId() - j) { - throw new IOException("Block IDs mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (blocki.getBlockId() - i != blockGroup.getBlockId()) { + throw new IOException("Block IDs mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } - if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) { - throw new IOException("Generation stamps mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) { + throw new IOException("Generation stamps mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 274d319..e621f26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1988,35 +1988,14 @@ public class DFSTestUtil { */ public static ExtendedBlock flushInternal(DFSStripedOutputStream out) throws IOException { - out.flushInternal(); + out.flushAllInternals(); return out.getBlock(); } - /** - * Verify that blocks in striped block group are on different nodes, and every - * internal blocks exists. - */ - public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, - int groupSize) { - for (LocatedBlock lb : lbs.getLocatedBlocks()) { - assert lb instanceof LocatedStripedBlock; - HashSet<DatanodeInfo> locs = new HashSet<>(); - for (DatanodeInfo datanodeInfo : lb.getLocations()) { - locs.add(datanodeInfo); - } - assertEquals(groupSize, lb.getLocations().length); - assertEquals(groupSize, locs.size()); - - // verify that every internal blocks exists - int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); - assertEquals(groupSize, blockIndices.length); - HashSet<Integer> found = new HashSet<>(); - for (int index : blockIndices) { - assert index >=0; - found.add(index); - } - assertEquals(groupSize, found.size()); - } + public static ExtendedBlock flushBuffer(DFSStripedOutputStream out) + throws IOException { + out.flush(); + return out.getBlock(); } public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue)