http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index b71e59e,0000000..4ca8fe6 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@@ -1,653 -1,0 +1,653 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.base.Preconditions; + + +/** + * This class supports writing files in striped layout and erasure coded format. + * Each stripe contains a sequence of cells. + */ +@InterfaceAudience.Private +public class DFSStripedOutputStream extends DFSOutputStream { + static class MultipleBlockingQueue<T> { + private final List<BlockingQueue<T>> queues; + + MultipleBlockingQueue(int numQueue, int queueSize) { + queues = new ArrayList<>(numQueue); + for (int i = 0; i < numQueue; i++) { + queues.add(new LinkedBlockingQueue<T>(queueSize)); + } + } + + boolean isEmpty() { + for(int i = 0; i < queues.size(); i++) { + if (!queues.get(i).isEmpty()) { + return false; + } + } + return true; + } + + int numQueues() { + return queues.size(); + } + + void offer(int i, T object) { + final boolean b = queues.get(i).offer(object); + Preconditions.checkState(b, "Failed to offer " + object + + " to queue, i=" + i); + } + + T take(int i) throws InterruptedIOException { + try { + return queues.get(i).take(); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie); + } + } + + T poll(int i) { + return queues.get(i).poll(); + } + + T peek(int i) { + return queues.get(i).peek(); + } + } + + /** Coordinate the communication between the streamers. */ + class Coordinator { + private final MultipleBlockingQueue<LocatedBlock> followingBlocks; + private final MultipleBlockingQueue<ExtendedBlock> endBlocks; + + private final MultipleBlockingQueue<LocatedBlock> newBlocks; + private final MultipleBlockingQueue<ExtendedBlock> updateBlocks; + + Coordinator(final DfsClientConf conf, final int numDataBlocks, + final int numAllBlocks) { + followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1); + + newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + } + + MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() { + return followingBlocks; + } + + MultipleBlockingQueue<LocatedBlock> getNewBlocks() { + return newBlocks; + } + + MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() { + return updateBlocks; + } + + StripedDataStreamer getStripedDataStreamer(int i) { + return DFSStripedOutputStream.this.getStripedDataStreamer(i); + } + + void offerEndBlock(int i, ExtendedBlock block) { + endBlocks.offer(i, block); + } + + ExtendedBlock takeEndBlock(int i) throws InterruptedIOException { + return endBlocks.take(i); + } + + boolean hasAllEndBlocks() { + for(int i = 0; i < endBlocks.numQueues(); i++) { + if (endBlocks.peek(i) == null) { + return false; + } + } + return true; + } + + void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { + ExtendedBlock b = endBlocks.peek(i); + if (b == null) { + // streamer just has failed, put end block and continue + b = block; + offerEndBlock(i, b); + } + b.setNumBytes(newBytes); + } + + /** @return a block representing the entire block group. */ + ExtendedBlock getBlockGroup() { + final StripedDataStreamer s0 = getStripedDataStreamer(0); + final ExtendedBlock b0 = s0.getBlock(); + if (b0 == null) { + return null; + } + + final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; + final ExtendedBlock block = new ExtendedBlock(b0); + long numBytes = b0.getNumBytes(); + for (int i = 1; i < numDataBlocks; i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + final ExtendedBlock bi = si.getBlock(); + if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(bi.getGenerationStamp()); + } + numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); + } + block.setNumBytes(numBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes()); + } + return block; + } + } + + /** Buffers for writing the data and parity cells of a stripe. */ + class CellBuffers { + private final ByteBuffer[] buffers; + private final byte[][] checksumArrays; + + CellBuffers(int numParityBlocks) throws InterruptedException{ + if (cellSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + + bytesPerChecksum + ") must divide cell size (=" + cellSize + ")."); + } + + checksumArrays = new byte[numParityBlocks][]; + final int size = getChecksumSize() * (cellSize / bytesPerChecksum); + for (int i = 0; i < checksumArrays.length; i++) { + checksumArrays[i] = new byte[size]; + } + + buffers = new ByteBuffer[numAllBlocks]; + for (int i = 0; i < buffers.length; i++) { + buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); + } + } + + private ByteBuffer[] getBuffers() { + return buffers; + } + + byte[] getChecksumArray(int i) { + return checksumArrays[i - numDataBlocks]; + } + + private int addTo(int i, byte[] b, int off, int len) { + final ByteBuffer buf = buffers[i]; + final int pos = buf.position() + len; + Preconditions.checkState(pos <= cellSize); + buf.put(b, off, len); + return pos; + } + + private void clear() { + for (int i = 0; i< numAllBlocks; i++) { + buffers[i].clear(); + if (i >= numDataBlocks) { + Arrays.fill(buffers[i].array(), (byte) 0); + } + } + } + + private void release() { + for (int i = 0; i < numAllBlocks; i++) { + byteArrayManager.release(buffers[i].array()); + } + } + + private void flipDataBuffers() { + for (int i = 0; i < numDataBlocks; i++) { + buffers[i].flip(); + } + } + } + + private final Coordinator coordinator; + private final CellBuffers cellBuffers; + private final RawErasureEncoder encoder; + private final List<StripedDataStreamer> streamers; + private final DFSPacket[] currentPackets; // current Packet of each streamer + + /** Size of each striping cell, must be a multiple of bytesPerChecksum */ + private final int cellSize; + private final int numAllBlocks; + private final int numDataBlocks; + + @Override + ExtendedBlock getBlock() { + return coordinator.getBlockGroup(); + } + + /** Construct a new output stream for creating a file. */ + DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + EnumSet<CreateFlag> flag, Progressable progress, + DataChecksum checksum, String[] favoredNodes) + throws IOException { + super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating DFSStripedOutputStream for " + src); + } + + final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy(); + final int numParityBlocks = ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + numDataBlocks = ecPolicy.getNumDataUnits(); + numAllBlocks = numDataBlocks + numParityBlocks; + + encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), + numDataBlocks, numParityBlocks); + + coordinator = new Coordinator(dfsClient.getConf(), + numDataBlocks, numAllBlocks); + try { + cellBuffers = new CellBuffers(numParityBlocks); + } catch (InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Failed to create cell buffers", ie); + } + + List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks); + for (short i = 0; i < numAllBlocks; i++) { + StripedDataStreamer streamer = new StripedDataStreamer(stat, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + favoredNodes, i, coordinator); + s.add(streamer); + } + streamers = Collections.unmodifiableList(s); + currentPackets = new DFSPacket[streamers.size()]; + setCurrentStreamer(0); + } + + StripedDataStreamer getStripedDataStreamer(int i) { + return streamers.get(i); + } + + int getCurrentIndex() { + return getCurrentStreamer().getIndex(); + } + + private synchronized StripedDataStreamer getCurrentStreamer() { + return (StripedDataStreamer)streamer; + } + + private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) + throws IOException { + // backup currentPacket for current streamer + int oldIdx = streamers.indexOf(streamer); + if (oldIdx >= 0) { + currentPackets[oldIdx] = currentPacket; + } + + streamer = streamers.get(newIdx); + currentPacket = currentPackets[newIdx]; + adjustChunkBoundary(); + + return getCurrentStreamer(); + } + + /** + * Encode the buffers, i.e. compute parities. + * + * @param buffers data buffers + parity buffers + */ + private static void encode(RawErasureEncoder encoder, int numData, + ByteBuffer[] buffers) { + final ByteBuffer[] dataBuffers = new ByteBuffer[numData]; + final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData]; + System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length); + System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length); + + encoder.encode(dataBuffers, parityBuffers); + } + + + private void checkStreamers() throws IOException { + int count = 0; + for(StripedDataStreamer s : streamers) { + if (!s.isFailed()) { + if (s.getBlock() != null) { + s.getErrorState().initExternalError(); + } + count++; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("checkStreamers: " + streamers); + LOG.debug("count=" + count); + } + if (count < numDataBlocks) { + throw new IOException("Failed: the number of remaining blocks = " + + count + " < the number of data blocks = " + numDataBlocks); + } + } + + private void handleStreamerFailure(String err, + Exception e) throws IOException { + LOG.warn("Failed: " + err + ", " + this, e); + getCurrentStreamer().setFailed(true); + checkStreamers(); + currentPacket = null; + } + + @Override + protected synchronized void writeChunk(byte[] bytes, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + final int index = getCurrentIndex(); + final StripedDataStreamer current = getCurrentStreamer(); + final int pos = cellBuffers.addTo(index, bytes, offset, len); + final boolean cellFull = pos == cellSize; + + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); + } catch(Exception e) { + handleStreamerFailure("offset=" + offset + ", length=" + len, e); + } + } + + if (current.isFailed()) { + final long newBytes = oldBytes + len; + coordinator.setBytesEndBlock(index, newBytes, current.getBlock()); + current.setBytesCurBlock(newBytes); + } + + // Two extra steps are needed when a striping cell is full: + // 1. Forward the current index pointer + // 2. Generate parity packets if a full stripe of data cells are present + if (cellFull) { + int next = index + 1; + //When all data cells in a stripe are ready, we need to encode + //them and generate some parity cells. These cells will be + //converted to packets and put to their DataStreamer's queue. + if (next == numDataBlocks) { + cellBuffers.flipDataBuffers(); + writeParityCells(); + next = 0; + } + setCurrentStreamer(next); + } + } + + private int stripeDataSize() { + return numDataBlocks * cellSize; + } + + @Override + public void hflush() { + throw new UnsupportedOperationException(); + } + + @Override + public void hsync() { + throw new UnsupportedOperationException(); + } + + @Override + protected synchronized void start() { + for (StripedDataStreamer streamer : streamers) { + streamer.start(); + } + } + + @Override + synchronized void abort() throws IOException { + if (isClosed()) { + return; + } + for (StripedDataStreamer streamer : streamers) { + streamer.getLastException().set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout()/1000) + + " seconds expired.")); + } + closeThreads(true); + dfsClient.endFileLease(fileId); + } + + @Override + boolean isClosed() { + if (closed) { + return true; + } + for(StripedDataStreamer s : streamers) { + if (!s.streamerClosed()) { + return false; + } + } + return true; + } + + @Override + protected void closeThreads(boolean force) throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + try { + for (StripedDataStreamer streamer : streamers) { + try { + streamer.close(force); + streamer.join(); + streamer.closeSocket(); + } catch (Exception e) { + try { + handleStreamerFailure("force=" + force, e); + } catch (IOException ioe) { + b.add(ioe); + } + } finally { + streamer.setSocketToNull(); + } + } + } finally { + setClosed(); + } + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + } + + /** + * Simply add bytesCurBlock together. Note that this result is not accurately + * the size of the block group. + */ + private long getCurrentSumBytes() { + long sum = 0; + for (int i = 0; i < numDataBlocks; i++) { + sum += streamers.get(i).getBytesCurBlock(); + } + return sum; + } + + private void writeParityCellsForLastStripe() throws IOException { + final long currentBlockGroupBytes = getCurrentSumBytes(); + if (currentBlockGroupBytes % stripeDataSize() == 0) { + return; + } + + final int firstCellSize = + (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); + final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? + firstCellSize : cellSize; + final ByteBuffer[] buffers = cellBuffers.getBuffers(); + + for (int i = 0; i < numAllBlocks; i++) { + // Pad zero bytes to make all cells exactly the size of parityCellSize + // If internal block is smaller than parity block, pad zero bytes. + // Also pad zero bytes to all parity cells + final int position = buffers[i].position(); + assert position <= parityCellSize : "If an internal block is smaller" + + " than parity block, then its last cell should be small than last" + + " parity cell"; + for (int j = 0; j < parityCellSize - position; j++) { + buffers[i].put((byte) 0); + } + buffers[i].flip(); + } + + writeParityCells(); + } + + void writeParityCells() throws IOException { + final ByteBuffer[] buffers = cellBuffers.getBuffers(); + //encode the data cells + encode(encoder, numDataBlocks, buffers); + for (int i = numDataBlocks; i < numAllBlocks; i++) { + writeParity(i, buffers[i], cellBuffers.getChecksumArray(i)); + } + cellBuffers.clear(); + } + + void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf + ) throws IOException { + final StripedDataStreamer current = setCurrentStreamer(index); + final int len = buffer.limit(); + + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + DataChecksum sum = getDataChecksum(); + sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); + for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { + int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); + int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); + super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset, + getChecksumSize()); + } + } catch(Exception e) { + handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); + } + } + + if (current.isFailed()) { + final long newBytes = oldBytes + len; + current.setBytesCurBlock(newBytes); + } + } + + @Override + void setClosed() { + super.setClosed(); + for (int i = 0; i < numAllBlocks; i++) { + streamers.get(i).release(); + } + cellBuffers.release(); + } + + @Override + protected synchronized void closeImpl() throws IOException { + if (isClosed()) { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + for(int i = 0; i < streamers.size(); i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + try { + si.getLastException().check(true); + } catch (IOException e) { + b.add(e); + } + } + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + return; + } + + try { + // flush from all upper layers + try { + flushBuffer(); + // if the last stripe is incomplete, generate and write parity cells + writeParityCellsForLastStripe(); + enqueueAllCurrentPackets(); + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); + } + + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (!s.isFailed()) { + try { + if (s.getBytesCurBlock() > 0) { - setCurrentPacket2Empty(); ++ setCurrentPacketToEmpty(); + } + // flush all data to Datanode + flushInternal(); + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); + } + } + } + + closeThreads(false); + final ExtendedBlock lastBlock = coordinator.getBlockGroup(); + TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } + dfsClient.endFileLease(fileId); + } catch (ClosedChannelException ignored) { + } finally { + setClosed(); + } + } + + private void enqueueAllCurrentPackets() throws IOException { + int idx = streamers.indexOf(getCurrentStreamer()); + for(int i = 0; i < streamers.size(); i++) { + setCurrentStreamer(i); + if (currentPacket != null) { + enqueueCurrentPacket(); + } + } + setCurrentStreamer(idx); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d55d00b,8e81fdc..50a367a --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@@ -199,12 -200,7 +201,13 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneResponseProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index f2facd7,4ca5b26..c083b5e --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@@ -3124,193 -3067,7 +3141,194 @@@ public class PBHelper setTotalRpcs(context.getTotalRpcs()). setCurRpc(context.getCurRpc()). setId(context.getReportId()). + setLeaseId(context.getLeaseId()). build(); } + + public static ECSchema convertECSchema(ECSchemaProto schema) { + List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList(); + Map<String, String> options = new HashMap<>(optionsList.size()); + for (ECSchemaOptionEntryProto option : optionsList) { + options.put(option.getKey(), option.getValue()); + } + return new ECSchema(schema.getCodecName(), schema.getDataUnits(), + schema.getParityUnits(), options); + } + + public static ECSchemaProto convertECSchema(ECSchema schema) { + ECSchemaProto.Builder builder = ECSchemaProto.newBuilder() + .setCodecName(schema.getCodecName()) + .setDataUnits(schema.getNumDataUnits()) + .setParityUnits(schema.getNumParityUnits()); + Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet(); + for (Entry<String, String> entry : entrySet) { + builder.addOptions(ECSchemaOptionEntryProto.newBuilder() + .setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + return builder.build(); + } + + public static ErasureCodingPolicy convertErasureCodingPolicy( + ErasureCodingPolicyProto policy) { + return new ErasureCodingPolicy(policy.getName(), + convertECSchema(policy.getSchema()), + policy.getCellSize()); + } + + public static ErasureCodingPolicyProto convertErasureCodingPolicy( + ErasureCodingPolicy policy) { + ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto + .newBuilder() + .setName(policy.getName()) + .setSchema(convertECSchema(policy.getSchema())) + .setCellSize(policy.getCellSize()); + return builder.build(); + } + + public static ErasureCodingZoneProto convertErasureCodingZone( + ErasureCodingZone ecZone) { + return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir()) + .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy())) + .build(); + } + + public static ErasureCodingZone convertErasureCodingZone( + ErasureCodingZoneProto ecZoneProto) { + return new ErasureCodingZone(ecZoneProto.getDir(), + convertErasureCodingPolicy(ecZoneProto.getEcPolicy())); + } + + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + ExtendedBlock block = convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); + + StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); + StorageType[] convertStorageTypes = convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + ErasureCodingPolicy ecPolicy = + convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy()); + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); + builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + + builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo + .getErasureCodingPolicy())); + + return builder.build(); + } + + private static List<Integer> convertIntArray(short[] liveBlockIndices) { + List<Integer> liveBlockIndicesList = new ArrayList<Integer>(); + for (short s : liveBlockIndices) { + liveBlockIndicesList.add((int) s); + } + return liveBlockIndicesList; + } + + private static StorageTypesProto convertStorageTypesProto( + StorageType[] targetStorageTypes) { + StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); + for (StorageType storageType : targetStorageTypes) { + builder.addStorageTypes(convertStorageType(storageType)); + } + return builder.build(); + } + + private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { + StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); + for (String storageUuid : targetStorageIDs) { + builder.addStorageUuids(storageUuid); + } + return builder.build(); + } + + private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { + DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : dnInfos) { + builder.addDatanodes(convert(datanodeInfo)); + } + return builder.build(); + } + + private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { + List<String> storageUuidsList = targetStorageUuidsProto + .getStorageUuidsList(); + String[] storageUuids = new String[storageUuidsList.size()]; + for (int i = 0; i < storageUuidsList.size(); i++) { + storageUuids[i] = storageUuidsList.get(i); + } + return storageUuids; + } + + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } + + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { + Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>(); + List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 930001a,f9847ca..555f506 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@@ -291,10 -310,7 +316,8 @@@ public class Dispatcher /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + this); - } + LOG.info("Start moving " + this); + assert !(reportedBlock instanceof DBlockStriped); Socket sock = new Socket(); DataOutputStream out = null; @@@ -323,7 -339,8 +346,8 @@@ sendRequest(out, eb, accessToken); receiveResponse(in); - nnc.getBytesMoved().addAndGet(block.getNumBytes()); + nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); + target.getDDatanode().setHasSuccess(); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); @@@ -656,29 -650,25 +695,39 @@@ * @return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { - final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); + final long size = Math.min(getBlocksSize, blocksToReceive); - final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + final BlocksWithLocations newBlksLocs = + nnc.getBlocks(getDatanodeInfo(), size); + if (LOG.isTraceEnabled()) { + LOG.trace("getBlocks(" + getDatanodeInfo() + ", " + + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) - + ") returns " + newBlocks.getBlocks().length + " blocks."); ++ + ") returns " + newBlksLocs.getBlocks().length + " blocks."); + } + long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks.getBlocks()) { + for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) { + // Skip small blocks. - if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) { ++ if (blkLocs.getBlock().getNumBytes() < getBlocksMinBlockSize) { + continue; + } - bytesReceived += blk.getBlock().getNumBytes(); + DBlock block; + if (blkLocs instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblkLocs = + (StripedBlockWithLocations) blkLocs; + // approximate size + bytesReceived += sblkLocs.getBlock().getNumBytes() / + sblkLocs.getDataBlockNum(); + block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(), + sblkLocs.getDataBlockNum()); + } else{ + bytesReceived += blkLocs.getBlock().getNumBytes(); + block = new DBlock(blkLocs.getBlock()); + } + synchronized (globalBlocks) { - final DBlock block = globalBlocks.get(blk.getBlock()); + block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block); synchronized (block) { block.clearLocations(); @@@ -944,8 -954,22 +1015,21 @@@ return new DDatanode(datanode, maxConcurrentMovesPerNode); } + public void executePendingMove(final PendingMove p) { - // move the block + // move the reportedBlock + final DDatanode targetDn = p.target.getDDatanode(); + ExecutorService moveExecutor = targetDn.getMoveExecutor(); + if (moveExecutor == null) { + final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + if (nThreads > 0) { + moveExecutor = targetDn.initMoveExecutor(nThreads); + } + } + if (moveExecutor == null) { + LOG.warn("No mover threads available: skip moving " + p); + return; + } - moveExecutor.execute(new Runnable() { @Override public void run() { @@@ -996,11 -1020,8 +1080,8 @@@ return getBytesMoved() - bytesLastMoved; } - /** The sleeping period before checking if reportedBlock move is completed again */ - static private long blockMoveWaitTime = 30000L; - /** - * Wait for all block move confirmations. + * Wait for all reportedBlock move confirmations. * @return true if there is failed move execution */ public static boolean waitForMoveCompletion( @@@ -1027,10 -1048,22 +1108,22 @@@ } /** + * @return true if some moves are success. + */ + public static boolean checkForSuccess( + Iterable<? extends StorageGroup> targets) { + boolean hasSuccess = false; + for (StorageGroup t : targets) { + hasSuccess |= t.getDDatanode().hasSuccess; + } + return hasSuccess; + } + + /** - * Decide if the block is a good candidate to be moved from source to target. - * A block is a good candidate if + * Decide if the block/blockGroup is a good candidate to be moved from source + * to target. A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; + * 2. the block does not have a replica/internalBlock on the target; * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 4308278,dea31c4..9387176 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@@ -19,19 -19,21 +19,20 @@@ package org.apache.hadoop.hdfs.server.b import java.util.LinkedList; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; - import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; /** - * BlockInfo class maintains for a given block - * the {@link BlockCollection} it is part of and datanodes where the replicas of - * the block are stored. + * For a given block (or an erasure coding block group), BlockInfo class + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes + * where the replicas of the block, or blocks belonging to the erasure coding + * block group, are stored. */ - public abstract class BlockInfo extends Block -@InterfaceAudience.Private + public abstract class BlockInfo extends Block implements LightWeightGSet.LinkedElement { public static final BlockInfo[] EMPTY_ARRAY = {}; + private BlockCollection bc; /** For implementing {@link LightWeightGSet.LinkedElement} interface */ @@@ -177,27 -188,7 +178,12 @@@ */ abstract void replaceBlock(BlockInfo newBlock); + public abstract boolean isStriped(); + + /** @return true if there is no datanode storage associated with the block */ + abstract boolean hasNoStorage(); + /** - * Find specified DatanodeDescriptor. - * @return index or -1 if not found. - */ - boolean findDatanode(DatanodeDescriptor dn) { - int len = getCapacity(); - for (int idx = 0; idx < len; idx++) { - DatanodeDescriptor cur = getDatanode(idx); - if(cur == dn) { - return true; - } - } - return false; - } - - /** * Find specified DatanodeStorageInfo. * @return DatanodeStorageInfo or null if not found. */ @@@ -303,27 -294,43 +289,26 @@@ /** * BlockInfo represents a block that is not being constructed. - * In order to start modifying the block, the BlockInfo should be converted - * to {@link BlockInfoContiguousUnderConstruction}. + * In order to start modifying the block, the BlockInfo should be converted to - * {@link BlockInfoUnderConstructionContiguous} or - * {@link BlockInfoUnderConstructionStriped}. - * @return {@link HdfsServerConstants.BlockUCState#COMPLETE} ++ * {@link BlockInfoContiguousUnderConstruction} or ++ * {@link BlockInfoStripedUnderConstruction}. + * @return {@link BlockUCState#COMPLETE} */ - public HdfsServerConstants.BlockUCState getBlockUCState() { - return HdfsServerConstants.BlockUCState.COMPLETE; + public BlockUCState getBlockUCState() { + return BlockUCState.COMPLETE; } /** * Is this block complete? * - * @return true if the state of the block is - * {@link HdfsServerConstants.BlockUCState#COMPLETE} + * @return true if the state of the block is {@link BlockUCState#COMPLETE} */ public boolean isComplete() { - return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE); + return getBlockUCState().equals(BlockUCState.COMPLETE); } - /** - * Convert a complete block to an under construction block. - * @return BlockInfoUnderConstruction - an under construction block. - */ - public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeStorageInfo[] targets) { - if(isComplete()) { - BlockInfoContiguousUnderConstruction ucBlock = - new BlockInfoContiguousUnderConstruction(this, - getBlockCollection().getPreferredBlockReplication(), s, targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - // the block is already under construction - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction)this; - ucBlock.setBlockUCState(s); - ucBlock.setExpectedLocations(targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; + public boolean isDeleted() { + return (bc == null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index d9adccc,eff89a8..bb9bf5b --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@@ -122,36 -120,4 +122,36 @@@ public class BlockInfoContiguous extend "newBlock already exists."; } } + + /** + * Convert a complete block to an under construction block. + * @return BlockInfoUnderConstruction - an under construction block. + */ - public BlockInfoUnderConstructionContiguous convertToBlockUnderConstruction( ++ public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( + BlockUCState s, DatanodeStorageInfo[] targets) { + if(isComplete()) { - BlockInfoUnderConstructionContiguous ucBlock = - new BlockInfoUnderConstructionContiguous(this, ++ BlockInfoContiguousUnderConstruction ucBlock = ++ new BlockInfoContiguousUnderConstruction(this, + getBlockCollection().getPreferredBlockReplication(), s, targets); + ucBlock.setBlockCollection(getBlockCollection()); + return ucBlock; + } + // the block is already under construction - BlockInfoUnderConstructionContiguous ucBlock = - (BlockInfoUnderConstructionContiguous) this; ++ BlockInfoContiguousUnderConstruction ucBlock = ++ (BlockInfoContiguousUnderConstruction) this; + ucBlock.setBlockUCState(s); + ucBlock.setExpectedLocations(targets); + ucBlock.setBlockCollection(getBlockCollection()); + return ucBlock; + } + + @Override + public final boolean isStriped() { + return false; + } + + @Override + final boolean hasNoStorage() { + return getStorageInfo(0) == null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java index 0000000,7ca6419..96b209d mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java @@@ -1,0 -1,403 +1,281 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.server.blockmanagement; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Iterator; + import java.util.List; + + import org.apache.hadoop.hdfs.protocol.Block; + import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; + import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + import org.apache.hadoop.hdfs.server.namenode.NameNode; + + /** + * Represents a block that is currently being constructed.<br> + * This is usually the last block of a file opened for write or append. + */ -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { ++public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous ++ implements BlockInfoUnderConstruction{ + /** Block state. See {@link BlockUCState} */ + private BlockUCState blockUCState; + + /** + * Block replicas as assigned when the block was allocated. + * This defines the pipeline order. + */ + private List<ReplicaUnderConstruction> replicas; + + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ + private int primaryNodeIndex = -1; + + /** + * The new generation stamp, which this block will have + * after the recovery succeeds. Also used as a recovery id to identify + * the right recovery if any of the abandoned recoveries re-appear. + */ + private long blockRecoveryId = 0; + + /** + * The block source to use in the event of copy-on-write truncate. + */ + private Block truncateBlock; + + /** - * ReplicaUnderConstruction contains information about replicas while - * they are under construction. - * The GS, the length and the state of the replica is as reported by - * the data-node. - * It is not guaranteed, but expected, that data-nodes actually have - * corresponding replicas. - */ - static class ReplicaUnderConstruction extends Block { - private final DatanodeStorageInfo expectedLocation; - private ReplicaState state; - private boolean chosenAsPrimary; - - ReplicaUnderConstruction(Block block, - DatanodeStorageInfo target, - ReplicaState state) { - super(block); - this.expectedLocation = target; - this.state = state; - this.chosenAsPrimary = false; - } - - /** - * Expected block replica location as assigned when the block was allocated. - * This defines the pipeline order. - * It is not guaranteed, but expected, that the data-node actually has - * the replica. - */ - private DatanodeStorageInfo getExpectedStorageLocation() { - return expectedLocation; - } - - /** - * Get replica state as reported by the data-node. - */ - ReplicaState getState() { - return state; - } - - /** - * Whether the replica was chosen for recovery. - */ - boolean getChosenAsPrimary() { - return chosenAsPrimary; - } - - /** - * Set replica state. - */ - void setState(ReplicaState s) { - state = s; - } - - /** - * Set whether this replica was chosen for recovery. - */ - void setChosenAsPrimary(boolean chosenAsPrimary) { - this.chosenAsPrimary = chosenAsPrimary; - } - - /** - * Is data-node the replica belongs to alive. - */ - boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; - } - - @Override // Block - public int hashCode() { - return super.hashCode(); - } - - @Override // Block - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(50); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - sb.append("ReplicaUC[") - .append(expectedLocation) - .append("|") - .append(state) - .append("]"); - } - } - - /** + * Create block and set its state to + * {@link BlockUCState#UNDER_CONSTRUCTION}. + */ + public BlockInfoContiguousUnderConstruction(Block blk, short replication) { + this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null); + } + + /** + * Create a block that is currently being constructed. + */ + public BlockInfoContiguousUnderConstruction(Block blk, short replication, + BlockUCState state, DatanodeStorageInfo[] targets) { + super(blk, replication); + assert getBlockUCState() != BlockUCState.COMPLETE : - "BlockInfoUnderConstruction cannot be in COMPLETE state"; ++ "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state"; + this.blockUCState = state; + setExpectedLocations(targets); + } + - /** - * Convert an under construction block to a complete block. - * - * @return BlockInfo - a complete block. - * @throws IOException if the state of the block - * (the generation stamp and the length) has not been committed by - * the client or it does not have at least a minimal number of replicas - * reported from data-nodes. - */ - BlockInfo convertToCompleteBlock() throws IOException { ++ @Override ++ public BlockInfoContiguous convertToCompleteBlock() throws IOException { + assert getBlockUCState() != BlockUCState.COMPLETE : + "Trying to convert a COMPLETE block"; + return new BlockInfoContiguous(this); + } + - /** Set expected locations */ ++ @Override + public void setExpectedLocations(DatanodeStorageInfo[] targets) { + int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations); - for(int i = 0; i < numLocations; i++) - replicas.add( - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); ++ this.replicas = new ArrayList<>(numLocations); ++ for(int i = 0; i < numLocations; i++) { ++ replicas.add(new ReplicaUnderConstruction(this, targets[i], ++ ReplicaState.RBW)); ++ } + } + - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ ++ @Override + public DatanodeStorageInfo[] getExpectedStorageLocations() { + int numLocations = replicas == null ? 0 : replicas.size(); + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for(int i = 0; i < numLocations; i++) ++ for (int i = 0; i < numLocations; i++) { + storages[i] = replicas.get(i).getExpectedStorageLocation(); ++ } + return storages; + } + - /** Get the number of expected locations */ ++ @Override + public int getNumExpectedLocations() { + return replicas == null ? 0 : replicas.size(); + } + + /** + * Return the state of the block under construction. + * @see BlockUCState + */ + @Override // BlockInfo + public BlockUCState getBlockUCState() { + return blockUCState; + } + + void setBlockUCState(BlockUCState s) { + blockUCState = s; + } + - /** Get block recovery ID */ ++ @Override + public long getBlockRecoveryId() { + return blockRecoveryId; + } + - /** Get recover block */ ++ @Override + public Block getTruncateBlock() { + return truncateBlock; + } + ++ @Override ++ public Block toBlock(){ ++ return this; ++ } ++ + public void setTruncateBlock(Block recoveryBlock) { + this.truncateBlock = recoveryBlock; + } + - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ ++ @Override + public void setGenerationStampAndVerifyReplicas(long genStamp) { + // Set the generation stamp for the block. + setGenerationStamp(genStamp); + if (replicas == null) + return; + + // Remove the replicas with wrong gen stamp. + // The replica list is unchanged. + for (ReplicaUnderConstruction r : replicas) { + if (genStamp != r.getGenerationStamp()) { + r.getExpectedStorageLocation().removeBlock(this); + NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica " + + "from location: {}", r.getExpectedStorageLocation()); + } + } + } + - /** - * Commit block's length and generation stamp as reported by the client. - * Set block state to {@link BlockUCState#COMMITTED}. - * @param block - contains client reported block length and generation - * @throws IOException if block ids are inconsistent. - */ - void commitBlock(Block block) throws IOException { ++ @Override ++ public void commitBlock(Block block) throws IOException { + if(getBlockId() != block.getBlockId()) + throw new IOException("Trying to commit inconsistent block: id = " + + block.getBlockId() + ", expected id = " + getBlockId()); + blockUCState = BlockUCState.COMMITTED; + this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); + // Sort out invalid replicas. + setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); + } + - /** - * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary and - * make it primary. - */ ++ @Override + public void initializeBlockRecovery(long recoveryId) { + setBlockUCState(BlockUCState.UNDER_RECOVERY); + blockRecoveryId = recoveryId; + if (replicas.size() == 0) { + NameNode.blockStateChangeLog.warn("BLOCK*" - + " BlockInfoUnderConstruction.initLeaseRecovery:" ++ + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:" + + " No blocks found, lease removed."); + } + boolean allLiveReplicasTriedAsPrimary = true; - for (int i = 0; i < replicas.size(); i++) { ++ for (ReplicaUnderConstruction replica : replicas) { + // Check if all replicas have been tried or not. - if (replicas.get(i).isAlive()) { - allLiveReplicasTriedAsPrimary = - (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); ++ if (replica.isAlive()) { ++ allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && ++ replica.getChosenAsPrimary()); + } + } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. - for (int i = 0; i < replicas.size(); i++) { - replicas.get(i).setChosenAsPrimary(false); ++ for (ReplicaUnderConstruction replica : replicas) { ++ replica.setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.size(); i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { + continue; + } + final ReplicaUnderConstruction ruc = replicas.get(i); + final long lastUpdate = ruc.getExpectedStorageLocation() + .getDatanodeDescriptor().getLastUpdateMonotonic(); + if (lastUpdate > mostRecentLastUpdate) { + primaryNodeIndex = i; + primary = ruc; + mostRecentLastUpdate = lastUpdate; + } + } + if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); ++ primary.getExpectedStorageLocation().getDatanodeDescriptor() ++ .addBlockToBeRecovered(this); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.debug( + "BLOCK* {} recovery started, primary={}", this, primary); + } + } + - void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block block, - ReplicaState rState) { ++ @Override ++ public void addReplicaIfNotPresent(DatanodeStorageInfo storage, ++ Block block, ReplicaState rState) { + Iterator<ReplicaUnderConstruction> it = replicas.iterator(); + while (it.hasNext()) { + ReplicaUnderConstruction r = it.next(); + DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation(); + if(expectedLocation == storage) { + // Record the gen stamp from the report + r.setGenerationStamp(block.getGenerationStamp()); + return; + } else if (expectedLocation != null && + expectedLocation.getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + it.remove(); + break; + } + } + replicas.add(new ReplicaUnderConstruction(block, storage, rState)); + } + - @Override // BlockInfo - // BlockInfoUnderConstruction participates in maps the same way as BlockInfo - public int hashCode() { - return super.hashCode(); - } - - @Override // BlockInfo - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - + @Override + public String toString() { + final StringBuilder b = new StringBuilder(100); + appendStringTo(b); + return b.toString(); + } + + @Override + public void appendStringTo(StringBuilder sb) { + super.appendStringTo(sb); + appendUCParts(sb); + } + + private void appendUCParts(StringBuilder sb) { + sb.append("{UCState=").append(blockUCState) + .append(", truncateBlock=" + truncateBlock) + .append(", primaryNodeIndex=").append(primaryNodeIndex) + .append(", replicas=["); + if (replicas != null) { + Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); + if (iter.hasNext()) { + iter.next().appendStringTo(sb); + while (iter.hasNext()) { + sb.append(", "); + iter.next().appendStringTo(sb); + } + } + } + sb.append("]}"); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index b88b554,0000000..14d2fcc mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@@ -1,279 -1,0 +1,279 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final ErasureCodingPolicy ecPolicy; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) { + super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits())); + indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()]; + initIndices(); + this.ecPolicy = ecPolicy; + } + + BlockInfoStriped(BlockInfoStriped b) { + this(b, b.getErasureCodingPolicy()); + this.setBlockCollection(b.getBlockCollection()); + } + + public short getTotalBlockNum() { + return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + } + + public short getDataBlockNum() { + return (short) ecPolicy.getNumDataUnits(); + } + + public short getParityBlockNum() { + return (short) ecPolicy.getNumParityUnits(); + } + + /** + * If the block is committed/completed and its length is less than a full + * stripe, it returns the the number of actual data blocks. + * Otherwise it returns the number of data units specified by erasure coding policy. + */ + public short getRealDataBlockNum() { + if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) { + return (short) Math.min(getDataBlockNum(), + (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1); + } else { + return getDataBlockNum(); + } + } + + public short getRealTotalBlockNum() { + return (short) (getRealDataBlockNum() + getParityBlockNum()); + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + int getStorageBlockIndex(DatanodeStorageInfo storage) { + int i = this.findStorageInfo(storage); + return i == -1 ? -1 : indices[i]; + } + + /** + * Identify the block stored in the given datanode storage. Note that + * the returned block has the same block Id with the one seen/reported by the + * DataNode. + */ + Block getBlockOnStorage(DatanodeStorageInfo storage) { + int index = getStorageBlockIndex(storage); + if (index < 0) { + return null; + } else { + Block block = new Block(this); + block.setBlockId(this.getBlockId() + index); + return block; + } + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoStriped; + BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; + final int size = getCapacity(); + newBlockGroup.ensureCapacity(size, false); + for (int i = 0; i < size; i++) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + if (storage != null) { + final int blockIndex = indices[i]; + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + newBlockGroup.addStorage(storage, i, blockIndex); + storage.insertToList(newBlockGroup); + } + } + } + + public long spaceConsumed() { + // In case striped blocks, total usage by this striped blocks should + // be the total of data blocks and parity blocks because + // `getNumBytes` is the total of actual data block size. + return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(), + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(), + BLOCK_STRIPED_CELL_SIZE); + } + + @Override + public final boolean isStriped() { + return true; + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } + + /** + * Convert a complete block to an under construction block. + * @return BlockInfoUnderConstruction - an under construction block. + */ - public BlockInfoUnderConstructionStriped convertToBlockUnderConstruction( ++ public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction( + BlockUCState s, DatanodeStorageInfo[] targets) { - final BlockInfoUnderConstructionStriped ucBlock; ++ final BlockInfoStripedUnderConstruction ucBlock; + if(isComplete()) { - ucBlock = new BlockInfoUnderConstructionStriped(this, ecPolicy, ++ ucBlock = new BlockInfoStripedUnderConstruction(this, ecPolicy, + s, targets); + ucBlock.setBlockCollection(getBlockCollection()); + } else { + // the block is already under construction - ucBlock = (BlockInfoUnderConstructionStriped) this; ++ ucBlock = (BlockInfoStripedUnderConstruction) this; + ucBlock.setBlockUCState(s); + ucBlock.setExpectedLocations(targets); + ucBlock.setBlockCollection(getBlockCollection()); + } + return ucBlock; + } + + @Override + final boolean hasNoStorage() { + final int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + if (getStorageInfo(idx) != null) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java index 0000000,0000000..9de8294 new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java @@@ -1,0 -1,0 +1,297 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hdfs.server.blockmanagement; ++ ++import org.apache.hadoop.hdfs.protocol.Block; ++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; ++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; ++import org.apache.hadoop.hdfs.server.namenode.NameNode; ++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; ++ ++import java.io.IOException; ++ ++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE; ++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; ++ ++/** ++ * Represents a striped block that is currently being constructed. ++ * This is usually the last block of a file opened for write or append. ++ */ ++public class BlockInfoStripedUnderConstruction extends BlockInfoStriped ++ implements BlockInfoUnderConstruction{ ++ private BlockUCState blockUCState; ++ ++ /** ++ * Block replicas as assigned when the block was allocated. ++ */ ++ private ReplicaUnderConstruction[] replicas; ++ ++ /** ++ * Index of the primary data node doing the recovery. Useful for log ++ * messages. ++ */ ++ private int primaryNodeIndex = -1; ++ ++ /** ++ * The new generation stamp, which this block will have ++ * after the recovery succeeds. Also used as a recovery id to identify ++ * the right recovery if any of the abandoned recoveries re-appear. ++ */ ++ private long blockRecoveryId = 0; ++ ++ /** ++ * Constructor with null storage targets. ++ */ ++ public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy) { ++ this(blk, ecPolicy, UNDER_CONSTRUCTION, null); ++ } ++ ++ /** ++ * Create a striped block that is currently being constructed. ++ */ ++ public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy, ++ BlockUCState state, DatanodeStorageInfo[] targets) { ++ super(blk, ecPolicy); ++ assert getBlockUCState() != COMPLETE : ++ "BlockInfoStripedUnderConstruction cannot be in COMPLETE state"; ++ this.blockUCState = state; ++ setExpectedLocations(targets); ++ } ++ ++ @Override ++ public BlockInfoStriped convertToCompleteBlock() throws IOException { ++ assert getBlockUCState() != COMPLETE : ++ "Trying to convert a COMPLETE block"; ++ return new BlockInfoStriped(this); ++ } ++ ++ /** Set expected locations */ ++ @Override ++ public void setExpectedLocations(DatanodeStorageInfo[] targets) { ++ int numLocations = targets == null ? 0 : targets.length; ++ this.replicas = new ReplicaUnderConstruction[numLocations]; ++ for(int i = 0; i < numLocations; i++) { ++ // when creating a new block we simply sequentially assign block index to ++ // each storage ++ Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp()); ++ replicas[i] = new ReplicaUnderConstruction(blk, targets[i], ++ ReplicaState.RBW); ++ } ++ } ++ ++ /** ++ * Create array of expected replica locations ++ * (as has been assigned by chooseTargets()). ++ */ ++ @Override ++ public DatanodeStorageInfo[] getExpectedStorageLocations() { ++ int numLocations = getNumExpectedLocations(); ++ DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; ++ for (int i = 0; i < numLocations; i++) { ++ storages[i] = replicas[i].getExpectedStorageLocation(); ++ } ++ return storages; ++ } ++ ++ /** @return the index array indicating the block index in each storage */ ++ public int[] getBlockIndices() { ++ int numLocations = getNumExpectedLocations(); ++ int[] indices = new int[numLocations]; ++ for (int i = 0; i < numLocations; i++) { ++ indices[i] = BlockIdManager.getBlockIndex(replicas[i]); ++ } ++ return indices; ++ } ++ ++ @Override ++ public int getNumExpectedLocations() { ++ return replicas == null ? 0 : replicas.length; ++ } ++ ++ /** ++ * Return the state of the block under construction. ++ * @see BlockUCState ++ */ ++ @Override // BlockInfo ++ public BlockUCState getBlockUCState() { ++ return blockUCState; ++ } ++ ++ void setBlockUCState(BlockUCState s) { ++ blockUCState = s; ++ } ++ ++ @Override ++ public long getBlockRecoveryId() { ++ return blockRecoveryId; ++ } ++ ++ @Override ++ public Block getTruncateBlock() { ++ return null; ++ } ++ ++ @Override ++ public Block toBlock(){ ++ return this; ++ } ++ ++ @Override ++ public void setGenerationStampAndVerifyReplicas(long genStamp) { ++ // Set the generation stamp for the block. ++ setGenerationStamp(genStamp); ++ if (replicas == null) ++ return; ++ ++ // Remove the replicas with wrong gen stamp. ++ // The replica list is unchanged. ++ for (ReplicaUnderConstruction r : replicas) { ++ if (genStamp != r.getGenerationStamp()) { ++ r.getExpectedStorageLocation().removeBlock(this); ++ NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " ++ + "from location: {}", r.getExpectedStorageLocation()); ++ } ++ } ++ } ++ ++ @Override ++ public void commitBlock(Block block) throws IOException { ++ if (getBlockId() != block.getBlockId()) { ++ throw new IOException("Trying to commit inconsistent block: id = " ++ + block.getBlockId() + ", expected id = " + getBlockId()); ++ } ++ blockUCState = BlockUCState.COMMITTED; ++ this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); ++ // Sort out invalid replicas. ++ setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); ++ } ++ ++ @Override ++ public void initializeBlockRecovery(long recoveryId) { ++ setBlockUCState(BlockUCState.UNDER_RECOVERY); ++ blockRecoveryId = recoveryId; ++ if (replicas == null || replicas.length == 0) { ++ NameNode.blockStateChangeLog.warn("BLOCK*" + ++ " BlockInfoStripedUnderConstruction.initLeaseRecovery:" + ++ " No blocks found, lease removed."); ++ // sets primary node index and return. ++ primaryNodeIndex = -1; ++ return; ++ } ++ boolean allLiveReplicasTriedAsPrimary = true; ++ for (ReplicaUnderConstruction replica : replicas) { ++ // Check if all replicas have been tried or not. ++ if (replica.isAlive()) { ++ allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && ++ replica.getChosenAsPrimary()); ++ } ++ } ++ if (allLiveReplicasTriedAsPrimary) { ++ // Just set all the replicas to be chosen whether they are alive or not. ++ for (ReplicaUnderConstruction replica : replicas) { ++ replica.setChosenAsPrimary(false); ++ } ++ } ++ long mostRecentLastUpdate = 0; ++ ReplicaUnderConstruction primary = null; ++ primaryNodeIndex = -1; ++ for(int i = 0; i < replicas.length; i++) { ++ // Skip alive replicas which have been chosen for recovery. ++ if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) { ++ continue; ++ } ++ final ReplicaUnderConstruction ruc = replicas[i]; ++ final long lastUpdate = ruc.getExpectedStorageLocation() ++ .getDatanodeDescriptor().getLastUpdateMonotonic(); ++ if (lastUpdate > mostRecentLastUpdate) { ++ primaryNodeIndex = i; ++ primary = ruc; ++ mostRecentLastUpdate = lastUpdate; ++ } ++ } ++ if (primary != null) { ++ primary.getExpectedStorageLocation().getDatanodeDescriptor() ++ .addBlockToBeRecovered(this); ++ primary.setChosenAsPrimary(true); ++ NameNode.blockStateChangeLog.info( ++ "BLOCK* {} recovery started, primary={}", this, primary); ++ } ++ } ++ ++ @Override ++ public void addReplicaIfNotPresent(DatanodeStorageInfo storage, ++ Block reportedBlock, ReplicaState rState) { ++ if (replicas == null) { ++ replicas = new ReplicaUnderConstruction[1]; ++ replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState); ++ } else { ++ for (int i = 0; i < replicas.length; i++) { ++ DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation(); ++ if (expected == storage) { ++ replicas[i].setBlockId(reportedBlock.getBlockId()); ++ replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp()); ++ return; ++ } else if (expected != null && expected.getDatanodeDescriptor() == ++ storage.getDatanodeDescriptor()) { ++ // The Datanode reported that the block is on a different storage ++ // than the one chosen by BlockPlacementPolicy. This can occur as ++ // we allow Datanodes to choose the target storage. Update our ++ // state by removing the stale entry and adding a new one. ++ replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage, ++ rState); ++ return; ++ } ++ } ++ ReplicaUnderConstruction[] newReplicas = ++ new ReplicaUnderConstruction[replicas.length + 1]; ++ System.arraycopy(replicas, 0, newReplicas, 0, replicas.length); ++ newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction( ++ reportedBlock, storage, rState); ++ replicas = newReplicas; ++ } ++ } ++ ++ @Override ++ public String toString() { ++ final StringBuilder b = new StringBuilder(100); ++ appendStringTo(b); ++ return b.toString(); ++ } ++ ++ @Override ++ public void appendStringTo(StringBuilder sb) { ++ super.appendStringTo(sb); ++ appendUCParts(sb); ++ } ++ ++ private void appendUCParts(StringBuilder sb) { ++ sb.append("{UCState=").append(blockUCState). ++ append(", primaryNodeIndex=").append(primaryNodeIndex). ++ append(", replicas=["); ++ if (replicas != null) { ++ int i = 0; ++ for (ReplicaUnderConstruction r : replicas) { ++ r.appendStringTo(sb); ++ if (++i < replicas.length) { ++ sb.append(", "); ++ } ++ } ++ } ++ sb.append("]}"); ++ } ++}