HDFS-8668. Erasure Coding: revisit buffer used for encoding and decoding. Contributed by Sammi Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5af9be7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5af9be7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5af9be7 Branch: refs/heads/YARN-2915 Commit: b5af9be72c72734d668f817c99d889031922a951 Parents: 4d3ea92 Author: Kai Zheng <kai.zh...@intel.com> Authored: Sat Aug 13 13:52:37 2016 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Sat Aug 13 13:52:37 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/io/ElasticByteBufferPool.java | 1 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 52 +++++++++++++++----- .../hadoop/hdfs/DFSStripedInputStream.java | 18 ++++--- .../hadoop/hdfs/DFSStripedOutputStream.java | 29 +++++++++-- .../StripedBlockChecksumReconstructor.java | 3 +- .../erasurecode/StripedBlockReader.java | 4 ++ .../erasurecode/StripedBlockReconstructor.java | 4 +- .../erasurecode/StripedBlockWriter.java | 22 +++++++-- .../datanode/erasurecode/StripedReader.java | 11 ++++- .../erasurecode/StripedReconstructor.java | 13 ++++- .../datanode/erasurecode/StripedWriter.java | 8 +++ .../hadoop/hdfs/TestDFSStripedInputStream.java | 8 +++ .../hadoop/hdfs/TestDFSStripedOutputStream.java | 8 +++ .../TestDFSStripedOutputStreamWithFailure.java | 11 ++++- .../hadoop/hdfs/TestReconstructStripedFile.java | 11 ++++- 15 files changed, 169 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index 694fcbe..c35d608 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -101,6 +101,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool { @Override public synchronized void putBuffer(ByteBuffer buffer) { + buffer.clear(); TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect()); while (true) { Key key = new Key(buffer.capacity(), System.nanoTime()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index cc919da..93aee0e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.EnumSet; import java.util.concurrent.atomic.AtomicReference; @@ -393,11 +394,47 @@ public class DFSOutputStream extends FSOutputSummer @Override protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { + writeChunkPrepare(len, ckoff, cklen); + + currentPacket.writeChecksum(checksum, ckoff, cklen); + currentPacket.writeData(b, offset, len); + currentPacket.incNumChunks(); + getStreamer().incBytesCurBlock(len); + + // If packet is full, enqueue it for transmission + if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || + getStreamer().getBytesCurBlock() == blockSize) { + enqueueCurrentPacketFull(); + } + } + + /* write the data chunk in <code>buffer</code> staring at + * <code>buffer.position</code> with + * a length of <code>len > 0</code>, and its checksum + */ + protected synchronized void writeChunk(ByteBuffer buffer, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + writeChunkPrepare(len, ckoff, cklen); + + currentPacket.writeChecksum(checksum, ckoff, cklen); + currentPacket.writeData(buffer, len); + currentPacket.incNumChunks(); + getStreamer().incBytesCurBlock(len); + + // If packet is full, enqueue it for transmission + if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || + getStreamer().getBytesCurBlock() == blockSize) { + enqueueCurrentPacketFull(); + } + } + + private synchronized void writeChunkPrepare(int buflen, + int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); - if (len > bytesPerChecksum) { - throw new IOException("writeChunk() buffer size is " + len + + if (buflen > bytesPerChecksum) { + throw new IOException("writeChunk() buffer size is " + buflen + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } @@ -414,17 +451,6 @@ public class DFSOutputStream extends FSOutputSummer currentPacket.getSeqno(), src, packetSize, chunksPerPacket, getStreamer().getBytesCurBlock() + ", " + this); } - - currentPacket.writeChecksum(checksum, ckoff, cklen); - currentPacket.writeData(b, offset, len); - currentPacket.incNumChunks(); - getStreamer().incBytesCurBlock(len); - - // If packet is full, enqueue it for transmission - if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || - getStreamer().getBytesCurBlock() == blockSize) { - enqueueCurrentPacketFull(); - } } void enqueueCurrentPacket() throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 1bdbc32..d93863c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -35,12 +35,12 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.util.DirectBufferPool; import java.io.EOFException; import java.io.IOException; @@ -139,7 +139,7 @@ public class DFSStripedInputStream extends DFSInputStream { } } - private static final DirectBufferPool bufferPool = new DirectBufferPool(); + private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); private final BlockReaderInfo[] blockReaders; private final int cellSize; @@ -194,9 +194,14 @@ public class DFSStripedInputStream extends DFSInputStream { } } + private boolean useDirectBuffer() { + return decoder.preferDirectBuffer(); + } + private void resetCurStripeBuffer() { if (curStripeBuf == null) { - curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum); + curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), + cellSize * dataBlkNum); } curStripeBuf.clear(); curStripeRange = new StripeRange(0, 0); @@ -204,7 +209,8 @@ public class DFSStripedInputStream extends DFSInputStream { private ByteBuffer getParityBuffer() { if (parityBuf == null) { - parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum); + parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), + cellSize * parityBlkNum); } parityBuf.clear(); return parityBuf; @@ -235,11 +241,11 @@ public class DFSStripedInputStream extends DFSInputStream { public synchronized void close() throws IOException { super.close(); if (curStripeBuf != null) { - bufferPool.returnBuffer(curStripeBuf); + BUFFER_POOL.putBuffer(curStripeBuf); curStripeBuf = null; } if (parityBuf != null) { - bufferPool.returnBuffer(parityBuf); + BUFFER_POOL.putBuffer(parityBuf); parityBuf = null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 85dc749..502e0a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -75,6 +77,9 @@ import org.apache.htrace.core.TraceScope; */ @InterfaceAudience.Private public class DFSStripedOutputStream extends DFSOutputStream { + + private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + static class MultipleBlockingQueue<T> { private final List<BlockingQueue<T>> queues; @@ -208,7 +213,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { buffers = new ByteBuffer[numAllBlocks]; for (int i = 0; i < buffers.length; i++) { - buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); + buffers[i] = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize); } } @@ -236,7 +241,10 @@ public class DFSStripedOutputStream extends DFSOutputStream { private void release() { for (int i = 0; i < numAllBlocks; i++) { - byteArrayManager.release(buffers[i].array()); + if (buffers[i] != null) { + BUFFER_POOL.putBuffer(buffers[i]); + buffers[i] = null; + } } } @@ -311,6 +319,10 @@ public class DFSStripedOutputStream extends DFSOutputStream { setCurrentStreamer(0); } + private boolean useDirectBuffer() { + return encoder.preferDirectBuffer(); + } + StripedDataStreamer getStripedDataStreamer(int i) { return streamers.get(i); } @@ -907,11 +919,20 @@ public class DFSStripedOutputStream extends DFSOutputStream { if (current.isHealthy()) { try { DataChecksum sum = getDataChecksum(); - sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); + if (buffer.isDirect()) { + ByteBuffer directCheckSumBuf = + BUFFER_POOL.getBuffer(true, checksumBuf.length); + sum.calculateChunkedSums(buffer, directCheckSumBuf); + directCheckSumBuf.get(checksumBuf); + BUFFER_POOL.putBuffer(directCheckSumBuf); + } else { + sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); + } + for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); - super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset, + super.writeChunk(buffer, chunkLen, checksumBuf, ckOffset, getChecksumSize()); } } catch(Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index c7294c7..944ed9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -57,6 +57,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { } private void init() throws IOException { + initDecoderIfNecessary(); getStripedReader().init(); // allocate buffer to keep the reconstructed block data targetBuffer = allocateBuffer(getBufferSize()); @@ -150,8 +151,6 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { } private void reconstructTargets(int toReconstructLen) { - initDecoderIfNecessary(); - ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen); ByteBuffer[] outputs = new ByteBuffer[1]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 140c658..8f976c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -90,6 +90,10 @@ class StripedBlockReader { return buffer; } + void freeReadBuffer() { + buffer = null; + } + void resetBlockReader(long offsetInBlock) { this.blockReader = createBlockReader(offsetInBlock); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index b800bef..9f9f15d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -49,6 +49,8 @@ class StripedBlockReconstructor extends StripedReconstructor public void run() { getDatanode().incrementXmitsInProgress(); try { + initDecoderIfNecessary(); + getStripedReader().init(); stripedWriter.init(); @@ -96,8 +98,6 @@ class StripedBlockReconstructor extends StripedReconstructor } private void reconstructTargets(int toReconstructLen) { - initDecoderIfNecessary(); - ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen); int[] erasedIndices = stripedWriter.getRealTargetIndices(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java index 32e8843..11551e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -65,6 +67,7 @@ class StripedBlockWriter { private ByteBuffer targetBuffer; private long blockOffset4Target = 0; private long seqNo4Target = 0; + private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode, Configuration conf, ExtendedBlock block, @@ -87,6 +90,10 @@ class StripedBlockWriter { return targetBuffer; } + void freeTargetBuffer() { + targetBuffer = null; + } + /** * Initialize output/input streams for transferring data to target * and send create block request. @@ -154,9 +161,18 @@ class StripedBlockWriter { return; } - stripedWriter.getChecksum().calculateChunkedSums( - targetBuffer.array(), 0, targetBuffer.remaining(), - stripedWriter.getChecksumBuf(), 0); + if (targetBuffer.isDirect()) { + ByteBuffer directCheckSumBuf = + BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length); + stripedWriter.getChecksum().calculateChunkedSums( + targetBuffer, directCheckSumBuf); + directCheckSumBuf.get(stripedWriter.getChecksumBuf()); + BUFFER_POOL.putBuffer(directCheckSumBuf); + } else { + stripedWriter.getChecksum().calculateChunkedSums( + targetBuffer.array(), 0, targetBuffer.remaining(), + stripedWriter.getChecksumBuf(), 0); + } int ckOff = 0; while (targetBuffer.remaining() > 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index e6d4ceb..238c628 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -180,7 +180,7 @@ class StripedReader { } protected ByteBuffer allocateReadBuffer() { - return ByteBuffer.allocate(getBufferSize()); + return reconstructor.allocateBuffer(getBufferSize()); } private void initZeroStrip() { @@ -421,7 +421,16 @@ class StripedReader { } void close() { + if (zeroStripeBuffers != null) { + for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { + reconstructor.freeBuffer(zeroStripeBuffer); + } + } + zeroStripeBuffers = null; + for (StripedBlockReader reader : readers) { + reconstructor.freeBuffer(reader.getReadBuffer()); + reader.freeReadBuffer(); reader.closeBlockReader(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 782d091..5641c35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; @@ -102,6 +104,7 @@ abstract class StripedReconstructor { private final ErasureCodingPolicy ecPolicy; private RawErasureDecoder decoder; private final ExtendedBlock blockGroup; + private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); // position in striped internal block private long positionInBlock; @@ -139,8 +142,16 @@ abstract class StripedReconstructor { */ abstract void reconstruct() throws IOException; + boolean useDirectBuffer() { + return decoder.preferDirectBuffer(); + } + ByteBuffer allocateBuffer(int length) { - return ByteBuffer.allocate(length); + return BUFFER_POOL.getBuffer(useDirectBuffer(), length); + } + + void freeBuffer(ByteBuffer buffer) { + BUFFER_POOL.putBuffer(buffer); } ExtendedBlock getBlock(int i) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index ca7a3a8..c099bc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -297,6 +297,14 @@ class StripedWriter { } void close() { + for (StripedBlockWriter writer : writers) { + ByteBuffer targetBuffer = writer.getTargetBuffer(); + if (targetBuffer != null) { + reconstructor.freeBuffer(targetBuffer); + writer.freeTargetBuffer(); + } + } + for (int i = 0; i < targets.length; i++) { writers[i].close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index a02a8d6..18c2de9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -32,7 +33,9 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.junit.After; import org.junit.Assert; @@ -77,6 +80,11 @@ public class TestDFSStripedInputStream { public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY, + NativeRSRawErasureCoderFactory.class.getCanonicalName()); + } SimulatedFSDataset.setFactory(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes( DATA_BLK_NUM + PARITY_BLK_NUM).build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index a12a8ce..8d54f08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -25,8 +25,11 @@ import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; @@ -65,6 +68,11 @@ public class TestDFSStripedOutputStream { conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY, + NativeRSRawErasureCoderFactory.class.getCanonicalName()); + } cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); fs = cluster.getFileSystem(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 83b6c58..11036a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; @@ -183,6 +186,11 @@ public class TestDFSStripedOutputStreamWithFailure { private void setup(Configuration conf) throws IOException { final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY, + NativeRSRawErasureCoderFactory.class.getCanonicalName()); + } cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive(); dfs = cluster.getFileSystem(); @@ -229,7 +237,8 @@ public class TestDFSStripedOutputStreamWithFailure { final HdfsConfiguration conf = newHdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); for (int dn = 0; dn < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; dn += 2) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af9be7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 36d2dbd..59e9f87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -33,6 +33,7 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -47,6 +48,8 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; @@ -86,11 +89,17 @@ public class TestReconstructStripedFile { public void setup() throws IOException { final Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + conf.setInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY, + NativeRSRawErasureCoderFactory.class.getCanonicalName()); + } cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build(); cluster.waitActive(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org