http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 7a7cd24,0000000..dabae2c mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@@ -1,1013 -1,0 +1,1014 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSPacket; +import org.apache.hadoop.hdfs.DFSUtil; ++import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; + +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; + +/** + * ErasureCodingWorker handles the erasure coding recovery work commands. These + * commands would be issued from Namenode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling EC + * commands. + */ +public final class ErasureCodingWorker { + private static final Log LOG = DataNode.LOG; + + private final DataNode datanode; + private final Configuration conf; + + private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL; + private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; + private final int STRIPED_READ_TIMEOUT_MILLIS; + private final int STRIPED_READ_BUFFER_SIZE; + + public ErasureCodingWorker(Configuration conf, DataNode datanode) { + this.datanode = datanode; + this.conf = conf; + + STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + initializeStripedReadThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); + STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + initializeStripedBlkRecoveryThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT)); + } + + private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { + return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits); + } + + private void initializeStripedReadThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped reads; pool threads=" + num); + } + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + private void initializeStripedBlkRecoveryThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped block recovery; pool threads=" + num); + } + STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, + TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIdx = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement()); + return t; + } + }); + STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + /** + * Handles the Erasure Coding recovery work commands. + * + * @param ecTasks + * BlockECRecoveryInfo + */ + public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) { + for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + try { + STRIPED_BLK_RECOVERY_THREAD_POOL + .submit(new ReconstructAndTransferBlock(recoveryInfo)); + } catch (Throwable e) { + LOG.warn("Failed to recover striped block " + + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + } + } + } + + /** + * ReconstructAndTransferBlock recover one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to recover striped block group, in each round, we + * recover <code>bufferSize</code> data until finish, the + * <code>bufferSize</code> is configurable and may be less or larger than + * cell size: + * step1: read <code>bufferSize</code> data from minimum number of sources + * required by recovery. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read <code>bufferSize</code> data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and recover all missed striped block + * if they are more than one. + * + * In step3, send the recovered data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the recovery work + * are one of the source datanodes, so the recovered data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC recovery? Since EC recovery + * is more expensive than continuous block replication, it needs to + * read from several other datanodes, should we make sure the + * recovered result received by targets? + */ + private class ReconstructAndTransferBlock implements Runnable { + private final int dataBlkNum; + private final int parityBlkNum; + private final int cellSize; + + private RawErasureDecoder decoder; + + // Striped read buffer size + private int bufferSize; + + private final ExtendedBlock blockGroup; + private final int minRequiredSources; + // position in striped internal block + private long positionInBlock; + + // sources + private final short[] liveIndices; + private final DatanodeInfo[] sources; + + private final List<StripedReader> stripedReaders; + + // The buffers and indices for striped blocks whose length is 0 + private ByteBuffer[] zeroStripeBuffers; + private short[] zeroStripeIndices; + + // targets + private final DatanodeInfo[] targets; + private final StorageType[] targetStorageTypes; + + private final short[] targetIndices; + private final ByteBuffer[] targetBuffers; + + private final Socket[] targetSockets; + private final DataOutputStream[] targetOutputStreams; + private final DataInputStream[] targetInputStreams; + + private final long[] blockOffset4Targets; + private final long[] seqNo4Targets; + + private final static int WRITE_PACKET_SIZE = 64 * 1024; + private DataChecksum checksum; + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + private final CachingStrategy cachingStrategy; + + private final Map<Future<Void>, Integer> futures = new HashMap<>(); + private final CompletionService<Void> readService = + new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); + + ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { + ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy(); + dataBlkNum = ecPolicy.getNumDataUnits(); + parityBlkNum = ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + + blockGroup = recoveryInfo.getExtendedBlock(); + final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); + minRequiredSources = Math.min(cellsNum, dataBlkNum); + + liveIndices = recoveryInfo.getLiveBlockIndices(); + sources = recoveryInfo.getSourceDnInfos(); + stripedReaders = new ArrayList<>(sources.length); + + Preconditions.checkArgument(liveIndices.length >= minRequiredSources, + "No enough live striped blocks."); + Preconditions.checkArgument(liveIndices.length == sources.length, + "liveBlockIndices and source dns should match"); + + if (minRequiredSources < dataBlkNum) { + zeroStripeBuffers = + new ByteBuffer[dataBlkNum - minRequiredSources]; + zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; + } + + targets = recoveryInfo.getTargetDnInfos(); + targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targetIndices = new short[targets.length]; + targetBuffers = new ByteBuffer[targets.length]; + + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + + targetSockets = new Socket[targets.length]; + targetOutputStreams = new DataOutputStream[targets.length]; + targetInputStreams = new DataInputStream[targets.length]; + + blockOffset4Targets = new long[targets.length]; + seqNo4Targets = new long[targets.length]; + + for (int i = 0; i < targets.length; i++) { + blockOffset4Targets[i] = 0; + seqNo4Targets[i] = 0; + } + + getTargetIndices(); + cachingStrategy = CachingStrategy.newDefaultStrategy(); + } + + private ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + + private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, + dataBlkNum, i); + } + + private long getBlockLen(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, i); + } + + /** + * StripedReader is used to read from one source DN, it contains a block + * reader, buffer and striped block index. + * Only allocate StripedReader once for one source, and the StripedReader + * has the same array order with sources. Typically we only need to allocate + * minimum number (minRequiredSources) of StripedReader, and allocate + * new for new source DN if some existing DN invalid or slow. + * If some source DN is corrupt, set the corresponding blockReader to + * null and will never read from it again. + * + * @param i the array index of sources + * @param offsetInBlock offset for the internal block + * @return StripedReader + */ + private StripedReader addStripedReader(int i, long offsetInBlock) { + StripedReader reader = new StripedReader(liveIndices[i]); + stripedReaders.add(reader); + + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]); + if (blockReader != null) { + initChecksumAndBufferSizeIfNeeded(blockReader); + reader.blockReader = blockReader; + } + reader.buffer = allocateBuffer(bufferSize); + return reader; + } + + @Override + public void run() { + datanode.incrementXmitsInProgress(); + try { + // Store the array indices of source DNs we have read successfully. + // In each iteration of read, the success list may be updated if + // some source DN is corrupted or slow. And use the updated success + // list of DNs for next iteration read. + int[] success = new int[minRequiredSources]; + + int nsuccess = 0; + for (int i = 0; + i < sources.length && nsuccess < minRequiredSources; i++) { + StripedReader reader = addStripedReader(i, 0); + if (reader.blockReader != null) { + success[nsuccess++] = i; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't find minimum sources required by " + + "recovery, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i] = allocateBuffer(bufferSize); + } + } + + for (int i = 0; i < targets.length; i++) { + targetBuffers[i] = allocateBuffer(bufferSize); + } + + checksumSize = checksum.getChecksumSize(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; + + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. + boolean[] targetsStatus = new boolean[targets.length]; + if (initTargetStreams(targetsStatus) == 0) { + String error = "All targets are failed."; + throw new IOException(error); + } + + long firstStripedBlockLength = getBlockLen(blockGroup, 0); + while (positionInBlock < firstStripedBlockLength) { + int toRead = Math.min( + bufferSize, (int)(firstStripedBlockLength - positionInBlock)); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + success = readMinimumStripedData4Recovery(success); + + // step2: decode to reconstruct targets + long remaining = firstStripedBlockLength - positionInBlock; + int toRecoverLen = remaining < bufferSize ? + (int)remaining : bufferSize; + recoverTargets(success, targetsStatus, toRecoverLen); + + // step3: transfer data + if (transferData2Targets(targetsStatus) == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + clearBuffers(); + positionInBlock += toRead; + } + + endTargetBlocks(targetsStatus); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to recover striped block: " + blockGroup, e); + } finally { + datanode.decrementXmitsInProgress(); + // close block readers + for (StripedReader stripedReader : stripedReaders) { + closeBlockReader(stripedReader.blockReader); + } + for (int i = 0; i < targets.length; i++) { + IOUtils.closeStream(targetOutputStreams[i]); + IOUtils.closeStream(targetInputStreams[i]); + IOUtils.closeStream(targetSockets[i]); + } + } + } + + // init checksum from block reader + private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { + if (checksum == null) { + checksum = blockReader.getDataChecksum(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + // The bufferSize is flat to divide bytesPerChecksum + int readBufferSize = STRIPED_READ_BUFFER_SIZE; + bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : + readBufferSize - readBufferSize % bytesPerChecksum; + } else { + assert blockReader.getDataChecksum().equals(checksum); + } + } + + private void getTargetIndices() { + BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); + for (int i = 0; i < sources.length; i++) { + bitset.set(liveIndices[i]); + } + int m = 0; + int k = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (getBlockLen(blockGroup, i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + } + } else { + zeroStripeIndices[k++] = (short)i; + } + } + } + } + + private long getReadLength(int index) { + long blockLen = getBlockLen(blockGroup, index); + long remaining = blockLen - positionInBlock; + return remaining > bufferSize ? bufferSize : remaining; + } + + /** + * Read from minimum source DNs required for reconstruction in the iteration. + * First try the success list which we think they are the best DNs + * If source DN is corrupt or slow, try to read some other source DN, + * and will update the success list. + * + * Remember the updated success list and return it for following + * operations and next iteration read. + * + * @param success the initial success list of source DNs we think best + * @return updated success list of source DNs we do real read + * @throws IOException + */ + private int[] readMinimumStripedData4Recovery(final int[] success) + throws IOException { + int nsuccess = 0; + int[] newSuccess = new int[minRequiredSources]; + BitSet used = new BitSet(sources.length); + /* + * Read from minimum source DNs required, the success list contains + * source DNs which we think best. + */ + for (int i = 0; i < minRequiredSources; i++) { + StripedReader reader = stripedReaders.get(success[i]); + if (getReadLength(liveIndices[success[i]]) > 0) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, success[i]); + } else { + // If the read length is 0, we don't need to do real read + reader.buffer.position(0); + newSuccess[nsuccess++] = success[i]; + } + used.set(success[i]); + } + + while (!futures.isEmpty()) { + try { + StripingChunkReadResult result = + StripedBlockUtil.getNextCompletedStripedRead( + readService, futures, STRIPED_READ_TIMEOUT_MILLIS); + int resultIndex = -1; + if (result.state == StripingChunkReadResult.SUCCESSFUL) { + resultIndex = result.index; + } else if (result.state == StripingChunkReadResult.FAILED) { + // If read failed for some source DN, we should not use it anymore + // and schedule read from another source DN. + StripedReader failedReader = stripedReaders.get(result.index); + closeBlockReader(failedReader.blockReader); + failedReader.blockReader = null; + resultIndex = scheduleNewRead(used); + } else if (result.state == StripingChunkReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + resultIndex = scheduleNewRead(used); + } + if (resultIndex >= 0) { + newSuccess[nsuccess++] = resultIndex; + if (nsuccess >= minRequiredSources) { + // cancel remaining reads if we read successfully from minimum + // number of source DNs required by reconstruction. + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } + } catch (InterruptedException e) { + LOG.info("Read data interrupted.", e); + break; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't read data from minimum number of sources " + + "required by reconstruction, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + return newSuccess; + } + + private void paddingBufferToLen(ByteBuffer buffer, int len) { + int toPadding = len - buffer.position(); + for (int i = 0; i < toPadding; i++) { + buffer.put((byte) 0); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = newDecoder(dataBlkNum, parityBlkNum); + } + } + + private int[] getErasedIndices(boolean[] targetsStatus) { + int[] result = new int[targets.length]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + result[m++] = convertIndex4Decode(targetIndices[i], + dataBlkNum, parityBlkNum); + } + } + return Arrays.copyOf(result, m); + } + + private void recoverTargets(int[] success, boolean[] targetsStatus, + int toRecoverLen) { + initDecoderIfNecessary(); + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < success.length; i++) { + StripedReader reader = stripedReaders.get(success[i]); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = + (ByteBuffer)buffer.flip(); + } + if (success.length < dataBlkNum) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + ByteBuffer buffer = zeroStripeBuffers[i]; + paddingBufferToLen(buffer, toRecoverLen); + int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum, + parityBlkNum); + inputs[index] = (ByteBuffer)buffer.flip(); + } + } + int[] erasedIndices = getErasedIndices(targetsStatus); + ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length]; + int m = 0; + for (int i = 0; i < targetBuffers.length; i++) { + if (targetsStatus[i]) { + outputs[m++] = targetBuffers[i]; + outputs[i].limit(toRecoverLen); + } + } + decoder.decode(inputs, erasedIndices, outputs); + + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = getBlockLen(blockGroup, targetIndices[i]); + long remaining = blockLen - positionInBlock; + if (remaining < 0) { + targetBuffers[i].limit(0); + } else if (remaining < toRecoverLen) { + targetBuffers[i].limit((int)remaining); + } + } + } + } + + /** + * Schedule a read from some new source DN if some DN is corrupted + * or slow, this is called from the read iteration. + * Initially we may only have <code>minRequiredSources</code> number of + * StripedReader. + * If the position is at the end of target block, don't need to do + * real read, and return the array index of source DN, otherwise -1. + * + * @param used the used source DNs in this iteration. + * @return the array index of source DN if don't need to do real read. + */ + private int scheduleNewRead(BitSet used) { + StripedReader reader = null; + // step1: initially we may only have <code>minRequiredSources</code> + // number of StripedReader, and there may be some source DNs we never + // read before, so will try to create StripedReader for one new source DN + // and try to read from it. If found, go to step 3. + int m = stripedReaders.size(); + while (reader == null && m < sources.length) { + reader = addStripedReader(m, positionInBlock); + if (getReadLength(liveIndices[m]) > 0) { + if (reader.blockReader == null) { + reader = null; + m++; + } + } else { + used.set(m); + return m; + } + } + + // step2: if there is no new source DN we can use, try to find a source + // DN we ever read from but because some reason, e.g., slow, it + // is not in the success DN list at the begin of this iteration, so + // we have not tried it in this iteration. Now we have a chance to + // revisit it again. + for (int i = 0; reader == null && i < stripedReaders.size(); i++) { + if (!used.get(i)) { + StripedReader r = stripedReaders.get(i); + if (getReadLength(liveIndices[i]) > 0) { + closeBlockReader(r.blockReader); + r.blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), positionInBlock, + sources[i]); + if (r.blockReader != null) { + m = i; + reader = r; + } + } else { + used.set(i); + r.buffer.position(0); + return i; + } + } + } + + // step3: schedule if find a correct source DN and need to do real read. + if (reader != null) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, m); + used.set(m); + } + + return -1; + } + + // cancel all reads. + private void cancelReads(Collection<Future<Void>> futures) { + for (Future<Void> future : futures) { + future.cancel(true); + } + } + + private Callable<Void> readFromBlock(final BlockReader reader, + final ByteBuffer buf) { + return new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + actualReadFromBlock(reader, buf); + return null; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } + } + + }; + } + + /** + * Read bytes from block + */ + private void actualReadFromBlock(BlockReader reader, ByteBuffer buf) + throws IOException { + int len = buf.remaining(); + int n = 0; + while (n < len) { + int nread = reader.read(buf); + if (nread <= 0) { + break; + } + n += nread; + } + } + + // close block reader + private void closeBlockReader(BlockReader blockReader) { + try { + if (blockReader != null) { + blockReader.close(); + } + } catch (IOException e) { + // ignore + } + } + + private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + private BlockReader newBlockReader(final ExtendedBlock block, + long offsetInBlock, DatanodeInfo dnInfo) { + if (offsetInBlock >= block.getNumBytes()) { + return null; + } + try { + InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); + Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); + /* + * This can be further improved if the replica is local, then we can + * read directly from DN and need to check the replica is FINALIZED + * state, notice we should not use short-circuit local read which + * requires config for domain-socket in UNIX or legacy config in Windows. + */ + return RemoteBlockReader2.newBlockReader( + "dummy", block, blockToken, offsetInBlock, + block.getNumBytes() - offsetInBlock, true, + "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, + null, cachingStrategy); + } catch (IOException e) { + return null; + } + } + + private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + final int socketTimeout = datanode.getDnConf().getSocketTimeout(); + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, socketTimeout); - peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), ++ peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(), + sock, datanode.getDataEncryptionKeyFactoryForBlock(b), + blockToken, datanodeId); + peer.setReadTimeout(socketTimeout); + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(LOG, peer); + IOUtils.closeSocket(sock); + } + } + } + + /** + * Send data to targets + */ + private int transferData2Targets(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + ByteBuffer buffer = targetBuffers[i]; + + if (buffer.remaining() == 0) { + continue; + } + + checksum.calculateChunkedSums( + buffer.array(), 0, buffer.remaining(), checksumBuf, 0); + + int ckOff = 0; + while (buffer.remaining() > 0) { + DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false); + int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum; + int toWrite = buffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : buffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize; + packet.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; + packet.writeData(buffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStreams[i]); + + blockOffset4Targets[i] += toWrite; + nsuccess++; + success = true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nsuccess; + } + + /** + * clear all buffers + */ + private void clearBuffers() { + for (StripedReader stripedReader : stripedReaders) { + if (stripedReader.buffer != null) { + stripedReader.buffer.clear(); + } + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i].clear(); + } + } + + for (int i = 0; i < targetBuffers.length; i++) { + if (targetBuffers[i] != null) { + cleanBuffer(targetBuffers[i]); + } + } + } + + private ByteBuffer cleanBuffer(ByteBuffer buffer) { + Arrays.fill(buffer.array(), (byte) 0); + return (ByteBuffer)buffer.clear(); + } + + // send an empty packet to mark the end of the block + private void endTargetBlocks(boolean[] targetsStatus) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + DFSPacket packet = new DFSPacket(packetBuf, 0, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true); + packet.writeTo(targetOutputStreams[i]); + targetOutputStreams[i].flush(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + private int initTargetStreams(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + Socket socket = null; + DataOutputStream out = null; + DataInputStream in = null; + boolean success = false; + try { + InetSocketAddress targetAddr = + getSocketAddress4Transfer(targets[i]); + socket = datanode.newSocket(); + NetUtils.connect(socket, targetAddr, + datanode.getDnConf().getSocketTimeout()); + socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); + + ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); + Token<BlockTokenIdentifier> blockToken = + datanode.getBlockAccessToken(block, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); + OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(socket); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend( + socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); + + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + DFSUtil.getSmallBufferSize(conf))); + in = new DataInputStream(unbufIn); + + DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); + new Sender(out).writeBlock(block, targetStorageTypes[i], + blockToken, "", new DatanodeInfo[]{targets[i]}, + new StorageType[]{targetStorageTypes[i]}, source, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, + checksum, cachingStrategy, false, false, null); + + targetSockets[i] = socket; + targetOutputStreams[i] = out; + targetInputStreams[i] = in; + nsuccess++; + success = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } finally { + if (!success) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeStream(socket); + } + } + targetsStatus[i] = success; + } + return nsuccess; + } + } + + private static class StripedReader { + private final short index; // internal block index + private BlockReader blockReader; + private ByteBuffer buffer; + + private StripedReader(short index) { + this.index = index; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index a115138,0ae739c..34b28e4 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@@ -39,10 -39,8 +39,9 @@@ import org.apache.hadoop.fs.permission. import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; - import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; @@@ -334,21 -331,13 +333,21 @@@ public final class FSImageFormatPBINod INodeSection.INodeFile f = n.getFile(); List<BlockProto> bp = f.getBlocksList(); short replication = (short) f.getReplication(); + boolean isStriped = f.getIsStriped(); LoaderContext state = parent.getLoaderContext(); + ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy(); BlockInfo[] blocks = new BlockInfo[bp.size()]; - for (int i = 0, e = bp.size(); i < e; ++i) { - blocks[i] = - new BlockInfoContiguous(PBHelperClient.convert(bp.get(i)), replication); + for (int i = 0; i < bp.size(); ++i) { + BlockProto b = bp.get(i); + if (isStriped) { - blocks[i] = new BlockInfoStriped(PBHelper.convert(b), ecPolicy); ++ blocks[i] = new BlockInfoStriped(PBHelperClient.convert(b), ecPolicy); + } else { - blocks[i] = new BlockInfoContiguous(PBHelper.convert(b), ++ blocks[i] = new BlockInfoContiguous(PBHelperClient.convert(b), + replication); + } } + final PermissionStatus permissions = loadPermission(f.getPermission(), parent.getLoaderContext().getStringTable()); @@@ -654,11 -632,10 +653,11 @@@ private void save(OutputStream out, INodeFile n) throws IOException { INodeSection.INodeFile.Builder b = buildINodeFile(n, parent.getSaverContext()); + BlockInfo[] blocks = n.getBlocks(); - if (n.getBlocks() != null) { + if (blocks != null) { for (Block block : n.getBlocks()) { - b.addBlocks(PBHelper.convert(block)); + b.addBlocks(PBHelperClient.convert(block)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ed52ca4,75b6be9..b6b151c --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@@ -4714,26 -4654,8 +4713,8 @@@ public class FSNamesystem implements Na && safeMode.isOn(); } - /** - * Check if replication queues are to be populated - * @return true when node is HAState.Active and not in the very first safemode - */ - @Override - public boolean isPopulatingReplQueues() { - if (!shouldPopulateReplQueues()) { - return false; - } - return initializedReplQueues; - } - - private boolean shouldPopulateReplQueues() { - if(haContext == null || haContext.getState() == null) - return false; - return haContext.getState().shouldPopulateReplQueues(); - } - @Override - public void incrementSafeBlockCount(int replication) { + public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) { // safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) @@@ -6233,11 -6150,11 +6222,16 @@@ return cacheManager; } + /** @return the ErasureCodingPolicyManager. */ + public ErasureCodingPolicyManager getErasureCodingPolicyManager() { + return ecPolicyManager; + } + + @Override + public HAContext getHAContext() { + return haContext; + } + @Override // NameNodeMXBean public String getCorruptFiles() { List<String> list = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 6f7b702,8565522..c765edc --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@@ -664,37 -608,12 +664,24 @@@ public class INodeFile extends INodeWit return counts; } + /** + * Compute quota of striped file. Note that currently EC files do not support + * append/hflush/hsync, thus the file length recorded in snapshots should be + * the same with the current file length. + */ + public final QuotaCounts computeQuotaUsageWithStriped( + BlockStoragePolicy bsp, QuotaCounts counts) { + counts.addNameSpace(1); + counts.add(storagespaceConsumed(bsp)); + return counts; + } + @Override public final ContentSummaryComputationContext computeContentSummary( - final ContentSummaryComputationContext summary) { + int snapshotId, final ContentSummaryComputationContext summary) { final ContentCounts counts = summary.getCounts(); - FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); - final long fileLen; - if (sf == null) { - fileLen = computeFileSize(); - counts.addContent(Content.FILE, 1); - } else { - final FileDiffList diffs = sf.getDiffs(); - final int n = diffs.asList().size(); - counts.addContent(Content.FILE, n); - if (n > 0 && sf.isCurrentFileDeleted()) { - fileLen = diffs.getLast().getFileSize(); - } else { - fileLen = computeFileSize(); - } - } + counts.addContent(Content.FILE, 1); + final long fileLen = computeFileSize(snapshotId); counts.addContent(Content.LENGTH, fileLen); counts.addContent(Content.DISKSPACE, storagespaceConsumed(null) .getStorageSpace()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index e1702d9,5bc4033..b1012c2 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@@ -64,4 -52,5 +65,6 @@@ public interface Namesystem extends RwL boolean isInSnapshot(BlockInfo blockUC); CacheManager getCacheManager(); ++ + HAContext getHAContext(); -} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java index 252844c,06a8219..98deed2 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java @@@ -39,15 -39,11 +39,12 @@@ public interface SafeMode */ public boolean isInStartupSafeMode(); - /** Check whether replication queues are being populated. */ - public boolean isPopulatingReplQueues(); - /** * Increment number of blocks that reached minimal replication. - * @param replication current replication + * @param replication current replication + * @param storedBlock current stored Block */ - public void incrementSafeBlockCount(int replication); + public void incrementSafeBlockCount(int replication, BlockInfo storedBlock); /** Decrement number of blocks that reached minimal replication. */ public void decrementSafeBlockCount(BlockInfo b); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 450d981,cf21411..ae23783 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@@ -243,15 -242,13 +243,15 @@@ public class FSImageFormatPBSnapshot FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null, pbf.getFileSize()); List<BlockProto> bpl = pbf.getBlocksList(); + // in file diff there can only be contiguous blocks BlockInfo[] blocks = new BlockInfo[bpl.size()]; for(int j = 0, e = bpl.size(); j < e; ++j) { - Block blk = PBHelper.convert(bpl.get(j)); + Block blk = PBHelperClient.convert(bpl.get(j)); BlockInfo storedBlock = bm.getStoredBlock(blk); if(storedBlock == null) { - storedBlock = bm.addBlockCollection( - new BlockInfoContiguous(blk, copy.getFileReplication()), file); + storedBlock = (BlockInfoContiguous) fsn.getBlockManager() + .addBlockCollectionWithCheck(new BlockInfoContiguous(blk, + copy.getFileReplication()), file); } blocks[j] = storedBlock; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java index 6c06a8d,0000000..0499a2b mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java @@@ -1,38 -1,0 +1,39 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cli; + +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli; +import org.apache.hadoop.cli.util.CLICommandTypes; +import org.apache.hadoop.cli.util.CLITestCmd; +import org.apache.hadoop.cli.util.CommandExecutor; +import org.apache.hadoop.cli.util.ErasureCodingCliCmdExecutor; ++import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.tools.erasurecode.ECCli; + +public class CLITestCmdErasureCoding extends CLITestCmd { + public CLITestCmdErasureCoding(String str, CLICommandTypes type) { + super(str, type); + } + + @Override - public CommandExecutor getExecutor(String tag) throws IllegalArgumentException { ++ public CommandExecutor getExecutor(String tag, Configuration conf) throws IllegalArgumentException { + if (getType() instanceof CLICommandErasureCodingCli) + return new ErasureCodingCliCmdExecutor(tag, new ECCli()); - return super.getExecutor(tag); ++ return super.getExecutor(tag, conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java index dfefb66,0000000..29ec98e mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java @@@ -1,115 -1,0 +1,115 @@@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cli; + +import org.apache.hadoop.cli.util.CLICommand; +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli; +import org.apache.hadoop.cli.util.CommandExecutor.Result; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.xml.sax.SAXException; + +public class TestErasureCodingCLI extends CLITestHelper { + private final int NUM_OF_DATANODES = 3; + private MiniDFSCluster dfsCluster = null; + private FileSystem fs = null; + private String namenode = null; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_OF_DATANODES).build(); + dfsCluster.waitClusterUp(); + namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///"); + + username = System.getProperty("user.name"); + + fs = dfsCluster.getFileSystem(); + } + + @Override + protected String getTestFile() { + return "testErasureCodingConf.xml"; + } + + @After + @Override + public void tearDown() throws Exception { + if (fs != null) { + fs.close(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + Thread.sleep(2000); + super.tearDown(); + } + + @Override + protected String expandCommand(final String cmd) { + String expCmd = cmd; + expCmd = expCmd.replaceAll("NAMENODE", namenode); + expCmd = expCmd.replaceAll("#LF#", System.getProperty("line.separator")); + expCmd = super.expandCommand(expCmd); + return expCmd; + } + + @Override + protected TestConfigFileParser getConfigParser() { + return new TestErasureCodingAdmin(); + } + + private class TestErasureCodingAdmin extends + CLITestHelper.TestConfigFileParser { + @Override + public void endElement(String uri, String localName, String qName) + throws SAXException { + if (qName.equals("ec-admin-command")) { + if (testCommands != null) { + testCommands.add(new CLITestCmdErasureCoding(charString, + new CLICommandErasureCodingCli())); + } else if (cleanupCommands != null) { + cleanupCommands.add(new CLITestCmdErasureCoding(charString, + new CLICommandErasureCodingCli())); + } + } else { + super.endElement(uri, localName, qName); + } + } + } + + @Override + protected Result execute(CLICommand cmd) throws Exception { - return cmd.getExecutor(namenode).executeCommand(cmd.getCmd()); ++ return cmd.getExecutor(namenode, conf).executeCommand(cmd.getCmd()); + } + + @Test + @Override + public void testAll() { + super.testAll(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 3551055,a7e80ca..12d4811 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@@ -66,14 -66,9 +66,15 @@@ import java.util.Set import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; + import org.apache.commons.lang.UnhandledException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@@ -141,10 -133,8 +142,11 @@@ import org.apache.hadoop.hdfs.server.na import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.tools.DFSAdmin; + import org.apache.hadoop.hdfs.tools.JMXGet; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; @@@ -1870,150 -1858,21 +1872,168 @@@ public class DFSTestUtil } } + public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( + Block block, BlockStatus blockStatus, DatanodeStorage storage) { + ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1]; + receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null); + StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1]; + reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks); + return reports; + } + + /** + * Creates the metadata of a file in striped layout. This method only + * manipulates the NameNode state without injecting data to DataNode. + * You should disable periodical heartbeat before use this. + * @param file Path of the file to create + * @param dir Parent path of the file + * @param numBlocks Number of striped block groups to add to the file + * @param numStripesPerBlk Number of striped cells in each block + * @param toMkdir + */ + public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception { + DistributedFileSystem dfs = cluster.getFileSystem(); + // If outer test already set EC policy, dir should be left as null + if (toMkdir) { + assert dir != null; + dfs.mkdirs(dir); + try { + dfs.getClient().setErasureCodingPolicy(dir.toString(), null); + } catch (IOException e) { + if (!e.getMessage().contains("non-empty directory")) { + throw e; + } + } + } + + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); // create an empty file + + FSNamesystem ns = cluster.getNamesystem(); + FSDirectory fsdir = ns.getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + + ExtendedBlock previous = null; + for (int i = 0; i < numBlocks; i++) { + Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns, + file.toString(), fileNode, dfs.getClient().getClientName(), + previous, numStripesPerBlk); + previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); + } + + dfs.getClient().namenode.complete(file.toString(), + dfs.getClient().getClientName(), previous, fileNode.getId()); + } finally { + IOUtils.cleanup(null, out); + } + } + + /** + * Adds a striped block group to a file. This method only manipulates NameNode + * states of the file and the block without injecting data to DataNode. + * It does mimic block reports. + * You should disable periodical heartbeat before use this. + * @param dataNodes List DataNodes to host the striped block group + * @param previous Previous block in the file + * @param numStripes Number of stripes in each block group + * @return The added block group + */ + public static Block addStripedBlockToFile(List<DataNode> dataNodes, + DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode, + String clientName, ExtendedBlock previous, int numStripes) + throws Exception { + fs.getClient().namenode.addBlock(file, clientName, previous, null, + fileNode.getId(), null); + + final BlockInfo lastBlock = fileNode.getLastBlock(); + final int groupSize = fileNode.getPreferredBlockReplication(); + assert dataNodes.size() >= groupSize; + // 1. RECEIVING_BLOCK IBR + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, 0, + lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + + // 2. RECEIVED_BLOCK IBR + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, + numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + + lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS); + return lastBlock; + } + + /** + * Because currently DFSStripedOutputStream does not support hflush/hsync, + * tests can use this method to flush all the buffered data to DataNodes. + */ + public static ExtendedBlock flushInternal(DFSStripedOutputStream out) + throws IOException { + out.flushInternal(); + return out.getBlock(); + } + + /** + * Verify that blocks in striped block group are on different nodes, and every + * internal blocks exists. + */ + public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, + int groupSize) { + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + HashSet<DatanodeInfo> locs = new HashSet<>(); + for (DatanodeInfo datanodeInfo : lb.getLocations()) { + locs.add(datanodeInfo); + } + assertEquals(groupSize, lb.getLocations().length); + assertEquals(groupSize, locs.size()); + + // verify that every internal blocks exists + int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); + assertEquals(groupSize, blockIndices.length); + HashSet<Integer> found = new HashSet<>(); + for (int index : blockIndices) { + assert index >=0; + found.add(index); + } + assertEquals(groupSize, found.size()); + } + } ++ + public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + final int currentValue = Integer.parseInt(jmx.getValue(metricName)); + LOG.info("Waiting for " + metricName + + " to reach value " + expectedValue + + ", current value = " + currentValue); + return currentValue == expectedValue; + } catch (Exception e) { + throw new UnhandledException("Test failed due to unexpected exception", e); + } + } + }, 1000, Integer.MAX_VALUE); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java index 50f98a3,0000000..c28bff8 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java @@@ -1,160 -1,0 +1,163 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; ++import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks; + +public class TestWriteStripedFileWithFailure { + public static final Log LOG = LogFactory + .getLog(TestWriteStripedFileWithFailure.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + private final int smallFileLength = blockSize * dataBlocks - 123; + private final int largeFileLength = blockSize * dataBlocks + 123; + private final int[] fileLengths = {smallFileLength, largeFileLength}; + + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); + fs = cluster.getFileSystem(); + } + + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + // Test writing file with some Datanodes failure ++ // TODO: enable this test after HDFS-8704 and HDFS-9040 ++ @Ignore + @Test(timeout = 300000) + public void testWriteStripedFileWithDNFailure() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + try { + // setup a new cluster with no dead datanode + setup(); + writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum); + } catch (IOException ioe) { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + LOG.error("Failed to write file with DN failure:" + + " fileType = "+ fileType + + ", dataDelNum = " + dataDelNum + + ", parityDelNum = " + parityDelNum); + throw ioe; + } finally { + // tear down the cluster + tearDown(); + } + } + } + } + } + + /** + * Test writing a file with shutting down some DNs(data DNs or parity DNs or both). + * @param fileLength file length + * @param dataDNFailureNum the shutdown number of data DNs + * @param parityDNFailureNum the shutdown number of parity DNs + * @throws IOException + */ + private void writeFileWithDNFailure(int fileLength, + int dataDNFailureNum, int parityDNFailureNum) throws IOException { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum + + "_" + fileType; + LOG.info("writeFileWithDNFailure: file = " + src + + ", fileType = " + fileType + + ", dataDNFailureNum = " + dataDNFailureNum + + ", parityDNFailureNum = " + parityDNFailureNum); + + Path srcPath = new Path(src); + final AtomicInteger pos = new AtomicInteger(); + final FSDataOutputStream out = fs.create(srcPath); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream)out.getWrappedStream(); + + int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks, + dataDNFailureNum); + Assert.assertNotNull(dataDNFailureIndices); + int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks, + dataBlocks + parityBlocks, parityDNFailureNum); + Assert.assertNotNull(parityDNFailureIndices); + + int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum]; + System.arraycopy(dataDNFailureIndices, 0, failedDataNodes, + 0, dataDNFailureIndices.length); + System.arraycopy(parityDNFailureIndices, 0, failedDataNodes, + dataDNFailureIndices.length, parityDNFailureIndices.length); + + final int killPos = fileLength/2; + for (; pos.get() < fileLength; ) { + final int i = pos.getAndIncrement(); + if (i == killPos) { + for(int failedDn : failedDataNodes) { + StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos); + } + } + write(out, i); + } + out.close(); + + // make sure the expected number of Datanode have been killed + int dnFailureNum = dataDNFailureNum + parityDNFailureNum; + Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + // delete the file + fs.delete(srcPath, true); + } + + void write(FSDataOutputStream out, int i) throws IOException { + try { + out.write(StripedFileTestUtil.getByte(i)); + } catch (IOException e) { + throw new IOException("Failed at i=" + i, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 0a27614,851e5b9..00a4575 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@@ -516,16 -489,16 +516,16 @@@ public class TestPBHelper @Test public void testConvertLocatedBlock() { LocatedBlock lb = createLocatedBlock(); - LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb); - LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto); - LocatedBlockProto lbProto = PBHelperClient.convert(lb); - LocatedBlock lb2 = PBHelperClient.convert(lbProto); ++ LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb); ++ LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto); compare(lb,lb2); } @Test public void testConvertLocatedBlockNoStorageMedia() { LocatedBlock lb = createLocatedBlockNoStorageMedia(); - LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb); - LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto); - LocatedBlockProto lbProto = PBHelperClient.convert(lb); - LocatedBlock lb2 = PBHelperClient.convert(lbProto); ++ LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb); ++ LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto); compare(lb,lb2); } @@@ -535,8 -508,8 +535,8 @@@ for (int i=0;i<3;i++) { lbl.add(createLocatedBlock()); } - List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl); - List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl); - List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlock2(lbl); - List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlock(lbpl); ++ List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlocks2(lbl); ++ List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlocks(lbpl); assertEquals(lbl.size(), lbl2.size()); for (int i=0;i<lbl.size();i++) { compare(lbl.get(i), lbl2.get(2)); @@@ -549,8 -522,8 +549,8 @@@ for (int i=0;i<3;i++) { lbl[i] = createLocatedBlock(); } - LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl); - LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl); - LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlock(lbl); - LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlock(lbpl); ++ LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlocks(lbl); ++ LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlocks(lbpl); assertEquals(lbl.length, lbl2.length); for (int i=0;i<lbl.length;i++) { compare(lbl[i], lbl2[i]); @@@ -664,99 -637,6 +664,99 @@@ .setType(AclEntryType.OTHER).build(); AclStatus s = new AclStatus.Builder().owner("foo").group("bar").addEntry(e) .build(); - Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s))); + Assert.assertEquals(s, PBHelperClient.convert(PBHelperClient.convert(s))); } + + @Test + public void testBlockECRecoveryCommand() { + DatanodeInfo[] dnInfos0 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s00")); + DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s01")); + DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { + targetDnInfos_0, targetDnInfos_1 }; + short[] liveBlkIndices0 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( + new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, + liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy()); + DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s02")); + DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s03")); + DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { + targetDnInfos_2, targetDnInfos_3 }; + short[] liveBlkIndices1 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( + new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, + liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy()); + List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>(); + blkRecoveryInfosList.add(blkECRecoveryInfo0); + blkRecoveryInfosList.add(blkECRecoveryInfo1); + BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList); + BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper + .convert(blkECRecoveryCmd); + blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto); + Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks() + .iterator(); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next()); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next()); + } + + private void assertBlockECRecoveryInfoEquals( + BlockECRecoveryInfo blkECRecoveryInfo1, + BlockECRecoveryInfo blkECRecoveryInfo2) { + assertEquals(blkECRecoveryInfo1.getExtendedBlock(), + blkECRecoveryInfo2.getExtendedBlock()); + + DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos(); + assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2); + + DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos(); + DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos(); + assertDnInfosEqual(targetDnInfos1, targetDnInfos2); + + String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs(); + String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs(); + assertEquals(targetStorageIDs1.length, targetStorageIDs2.length); + for (int i = 0; i < targetStorageIDs1.length; i++) { + assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]); + } + + short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices(); + short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices(); + for (int i = 0; i < liveBlockIndices1.length; i++) { + assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); + } + + ErasureCodingPolicy ecPolicy1 = blkECRecoveryInfo1.getErasureCodingPolicy(); + ErasureCodingPolicy ecPolicy2 = blkECRecoveryInfo2.getErasureCodingPolicy(); + // Compare ECPolicies same as default ECPolicy as we used system default + // ECPolicy used in this test + compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy1); + compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy2); + } + + private void compareECPolicies(ErasureCodingPolicy ecPolicy1, ErasureCodingPolicy ecPolicy2) { + assertEquals(ecPolicy1.getName(), ecPolicy2.getName()); + assertEquals(ecPolicy1.getNumDataUnits(), ecPolicy2.getNumDataUnits()); + assertEquals(ecPolicy1.getNumParityUnits(), ecPolicy2.getNumParityUnits()); + } + + private void assertDnInfosEqual(DatanodeInfo[] dnInfos1, + DatanodeInfo[] dnInfos2) { + assertEquals(dnInfos1.length, dnInfos2.length); + for (int i = 0; i < dnInfos1.length; i++) { + compare(dnInfos1[i], dnInfos2[i]); + } + } }
