Repository: hadoop Updated Branches: refs/heads/trunk 8d29e2451 -> 3c18a53cb
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java new file mode 100644 index 0000000..a0a5f83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; + +/** + * StripedReconstructor reconstruct one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to reconstruct striped block group, in each round, we + * reconstruct <code>bufferSize</code> data until finish, the + * <code>bufferSize</code> is configurable and may be less or larger than + * cell size: + * step1: read <code>bufferSize</code> data from minimum number of sources + * required by reconstruction. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read <code>bufferSize</code> data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and reconstruct all missed striped block + * if they are more than one. + * + * In step3, send the reconstructed data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the reconstruction work + * are one of the source datanodes, so the reconstructed data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC reconstruction? Since EC + * reconstruction is more expensive than continuous block replication, + * it needs to read from several other datanodes, should we make sure the + * reconstructed result received by targets? + */ +@InterfaceAudience.Private +class StripedReconstructor implements Runnable { + private static final Logger LOG = DataNode.LOG; + + private final ErasureCodingWorker worker; + private final DataNode datanode; + private final Configuration conf; + + private final ErasureCodingPolicy ecPolicy; + + private RawErasureDecoder decoder; + + private final ExtendedBlock blockGroup; + private final BitSet liveBitSet; + + // position in striped internal block + private long positionInBlock; + + private StripedReader stripedReader; + + private StripedWriter stripedWriter; + + private final CachingStrategy cachingStrategy; + + StripedReconstructor(ErasureCodingWorker worker, + BlockECReconstructionInfo reconstructionInfo) { + this.worker = worker; + this.datanode = worker.getDatanode(); + this.conf = worker.getConf(); + + ecPolicy = reconstructionInfo.getErasureCodingPolicy(); + + blockGroup = reconstructionInfo.getExtendedBlock(); + byte[] liveIndices = reconstructionInfo.getLiveBlockIndices(); + liveBitSet = new BitSet(ecPolicy.getNumDataUnits() + + ecPolicy.getNumParityUnits()); + for (int i = 0; i < liveIndices.length; i++) { + liveBitSet.set(liveIndices[i]); + } + + stripedReader = new StripedReader(this, datanode, + conf, reconstructionInfo); + stripedWriter = new StripedWriter(this, datanode, + conf, reconstructionInfo); + + cachingStrategy = CachingStrategy.newDefaultStrategy(); + + positionInBlock = 0L; + } + + BitSet getLiveBitSet() { + return liveBitSet; + } + + ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + + ExtendedBlock getBlock(int i) { + return StripedBlockUtil.constructInternalBlock(blockGroup, ecPolicy, i); + } + + long getBlockLen(int i) { + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), + ecPolicy, i); + } + + boolean hasValidTargets() { + return stripedWriter.hasValidTargets(); + } + + @Override + public void run() { + datanode.incrementXmitsInProgress(); + try { + stripedReader.init(); + + stripedWriter.init(); + + reconstructAndTransfer(); + + stripedWriter.endTargetBlocks(); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e); + } finally { + datanode.decrementXmitsInProgress(); + + stripedReader.close(); + + stripedWriter.close(); + } + } + + void reconstructAndTransfer() throws IOException { + while (positionInBlock < stripedWriter.getMaxTargetLength()) { + long remaining = stripedWriter.getMaxTargetLength() - positionInBlock; + final int toReconstructLen = + (int) Math.min(stripedReader.getBufferSize(), remaining); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + stripedReader.readMinimumSources(toReconstructLen); + + // step2: decode to reconstruct targets + reconstructTargets(toReconstructLen); + + // step3: transfer data + if (stripedWriter.transferData2Targets() == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + positionInBlock += toReconstructLen; + + clearBuffers(); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = CodecUtil.createRSRawDecoder(conf, ecPolicy.getNumDataUnits(), + ecPolicy.getNumParityUnits()); + } + } + + private void reconstructTargets(int toReconstructLen) { + initDecoderIfNecessary(); + + ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen); + + int[] erasedIndices = stripedWriter.getRealTargetIndices(); + ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); + + decoder.decode(inputs, erasedIndices, outputs); + + stripedWriter.updateRealTargetBuffers(toReconstructLen); + } + + long getPositionInBlock() { + return positionInBlock; + } + + /** + * Clear all associated buffers. + */ + private void clearBuffers() { + stripedReader.clearBuffers(); + + stripedWriter.clearBuffers(); + } + + InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + int getBufferSize() { + return stripedReader.getBufferSize(); + } + + DataChecksum getChecksum() { + return stripedReader.getChecksum(); + } + + CachingStrategy getCachingStrategy() { + return cachingStrategy; + } + + CompletionService<Void> createReadService() { + return new ExecutorCompletionService<>(worker.getStripedReadPool()); + } + + ExtendedBlock getBlockGroup() { + return blockGroup; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java new file mode 100644 index 0000000..e2052a3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.BitSet; + +/** + * Manage striped writers that writes to a target with reconstructed data. + */ +@InterfaceAudience.Private +class StripedWriter { + private static final Logger LOG = DataNode.LOG; + private final static int WRITE_PACKET_SIZE = 64 * 1024; + + private final StripedReconstructor reconstructor; + private final DataNode datanode; + private final Configuration conf; + + private final int dataBlkNum; + private final int parityBlkNum; + + private boolean[] targetsStatus; + + // targets + private final DatanodeInfo[] targets; + private final short[] targetIndices; + private boolean hasValidTargets; + private final StorageType[] targetStorageTypes; + private long maxTargetLength; + + private StripedBlockWriter[] writers; + + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + StripedWriter(StripedReconstructor reconstructor, + DataNode datanode, + Configuration conf, + BlockECReconstructionInfo reconstructionInfo) { + this.reconstructor = reconstructor; + this.datanode = datanode; + this.conf = conf; + + ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy(); + dataBlkNum = ecPolicy.getNumDataUnits(); + parityBlkNum = ecPolicy.getNumParityUnits(); + + targets = reconstructionInfo.getTargetDnInfos(); + targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); + + writers = new StripedBlockWriter[targets.length]; + + targetIndices = new short[targets.length]; + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + initTargetIndices(); + + maxTargetLength = 0L; + for (short targetIndex : targetIndices) { + maxTargetLength = Math.max(maxTargetLength, + reconstructor.getBlockLen(targetIndex)); + } + + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. + targetsStatus = new boolean[targets.length]; + } + + void init() throws IOException { + DataChecksum checksum = reconstructor.getChecksum(); + checksumSize = checksum.getChecksumSize(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + int tmpLen = checksumSize * + (reconstructor.getBufferSize() / bytesPerChecksum); + checksumBuf = new byte[tmpLen]; + + if (initTargetStreams() == 0) { + String error = "All targets are failed."; + throw new IOException(error); + } + } + + private void initTargetIndices() { + BitSet bitset = reconstructor.getLiveBitSet(); + + int m = 0; + int k = 0; + hasValidTargets = false; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (reconstructor.getBlockLen(i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + hasValidTargets = true; + } + } + } + } + } + + /** + * Send reconstructed data to targets. + */ + int transferData2Targets() { + int nSuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + writers[i].transferData2Target(packetBuf); + nSuccess++; + success = true; + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nSuccess; + } + + /** + * Send an empty packet to mark the end of the block. + */ + void endTargetBlocks() { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + writers[i].endTargetBlock(packetBuf); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + int initTargetStreams() { + int nSuccess = 0; + for (short i = 0; i < targets.length; i++) { + try { + writers[i] = createWriter(i); + nSuccess++; + targetsStatus[i] = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } + } + return nSuccess; + } + + private StripedBlockWriter createWriter(short index) throws IOException { + return new StripedBlockWriter(this, datanode, conf, + reconstructor.getBlock(targetIndices[index]), targets[index], + targetStorageTypes[index]); + } + + ByteBuffer allocateWriteBuffer() { + return reconstructor.allocateBuffer(reconstructor.getBufferSize()); + } + + int getTargets() { + return targets.length; + } + + private int getRealTargets() { + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + m++; + } + } + return m; + } + + int[] getRealTargetIndices() { + int realTargets = getRealTargets(); + int[] results = new int[realTargets]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + results[m++] = targetIndices[i]; + } + } + return results; + } + + ByteBuffer[] getRealTargetBuffers(int toReconstructLen) { + int numGood = getRealTargets(); + ByteBuffer[] outputs = new ByteBuffer[numGood]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + writers[i].getTargetBuffer().limit(toReconstructLen); + outputs[m++] = writers[i].getTargetBuffer(); + } + } + return outputs; + } + + void updateRealTargetBuffers(int toReconstructLen) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = reconstructor.getBlockLen(targetIndices[i]); + long remaining = blockLen - reconstructor.getPositionInBlock(); + if (remaining <= 0) { + writers[i].getTargetBuffer().limit(0); + } else if (remaining < toReconstructLen) { + writers[i].getTargetBuffer().limit((int)remaining); + } + } + } + } + + long getMaxTargetLength() { + return maxTargetLength; + } + + byte[] getChecksumBuf() { + return checksumBuf; + } + + int getBytesPerChecksum() { + return bytesPerChecksum; + } + + int getChecksumSize() { + return checksumSize; + } + + DataChecksum getChecksum() { + return reconstructor.getChecksum(); + } + + int getMaxChunksPerPacket() { + return maxChunksPerPacket; + } + + CachingStrategy getCachingStrategy() { + return reconstructor.getCachingStrategy(); + } + + InetSocketAddress getSocketAddress4Transfer(DatanodeInfo target) { + return reconstructor.getSocketAddress4Transfer(target); + } + + boolean hasValidTargets() { + return hasValidTargets; + } + + /** + * Clear all buffers. + */ + void clearBuffers() { + for (StripedBlockWriter writer : writers) { + ByteBuffer targetBuffer = writer.getTargetBuffer(); + if (targetBuffer != null) { + targetBuffer.clear(); + } + } + } + + void close() { + for (int i = 0; i < targets.length; i++) { + writers[i].close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java new file mode 100644 index 0000000..3150fce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Datanode side striping + erasure coding related task processing. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.datanode.erasurecode; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 38ca8ce..7155e74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -230,22 +230,23 @@ public class TestReconstructStripedFile { private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets, ReconstructionType type) throws IOException { - int stoppedDN = 0; - for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) { - if (stoppedDN == 0 || type != ReconstructionType.DataOnly + int stoppedDNs = 0; + for (Map.Entry<ExtendedBlock, DataNode> target : + corruptTargets.entrySet()) { + if (stoppedDNs == 0 || type != ReconstructionType.DataOnly || random.nextBoolean()) { // stop at least one DN to trigger reconstruction LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); shutdownDataNode(target.getValue()); - stoppedDN++; + stoppedDNs++; } else { // corrupt the data on the DN LOG.info("Note: corrupt data on " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); cluster.corruptReplica(target.getValue(), target.getKey()); } } - return stoppedDN; + return stoppedDNs; } /**