This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 7a6ea80e59bc3795a3cdcc571c238674967cdd86 Author: Sadanand Shenoy <[email protected]> AuthorDate: Wed Dec 1 11:00:46 2021 +0530 HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed boundary for streaming writes. (#2866) --- .../apache/hadoop/hdds/scm/OzoneClientConfig.java | 31 ++++++++++++++++++++ .../hdds/scm/storage/BlockDataStreamOutput.java | 29 ++++++++++++++---- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 ++++++++ .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 11 +++++++ .../client/rpc/TestBlockDataStreamOutput.java | 34 ++++++++++++++++++++++ 5 files changed, 111 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 14a8aacbc0..715a19212b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -66,6 +66,21 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int streamBufferSize = 4 * 1024 * 1024; + @Config(key = "datastream.max.buffer.size", + defaultValue = "4MB", + type = ConfigType.SIZE, + description = "The maximum size of the ByteBuffer " + + "(used via ratis streaming)", + tags = ConfigTag.CLIENT) + private int dataStreamMaxBufferSize = 4 * 1024 * 1024; + + @Config(key = "datastream.buffer.flush.size", + defaultValue = "16MB", + type = ConfigType.SIZE, + description = "The boundary at which putBlock is executed", + tags = ConfigTag.CLIENT) + private long dataStreamBufferFlushSize = 16 * 1024 * 1024; + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -228,6 +243,14 @@ public class OzoneClientConfig { this.streamBufferSize = streamBufferSize; } + public int getDataStreamMaxBufferSize() { + return dataStreamMaxBufferSize; + } + + public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) { + this.dataStreamMaxBufferSize = dataStreamMaxBufferSize; + } + public boolean isStreamBufferFlushDelay() { return streamBufferFlushDelay; } @@ -296,6 +319,14 @@ public class OzoneClientConfig { return bufferIncrement; } + public long getDataStreamBufferFlushSize() { + return dataStreamBufferFlushSize; + } + + public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) { + this.dataStreamBufferFlushSize = dataStreamBufferFlushSize; + } + public ChecksumCombineMode getChecksumCombineMode() { try { return ChecksumCombineMode.valueOf(checksumCombineMode); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index aada48e2f5..6f5a54354a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -257,13 +257,30 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { if (len == 0) { return; } + int curLen = len; + // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold + int maxBufferLen = config.getDataStreamMaxBufferSize(); + while (curLen > 0) { + int writeLen = Math.min(curLen, maxBufferLen); + final StreamBuffer buf = new StreamBuffer(b, off, writeLen); + off += writeLen; + bufferList.add(buf); + writeChunkToContainer(buf.duplicate()); + curLen -= writeLen; + writtenDataLength += writeLen; + doFlushIfNeeded(); + } + } - final StreamBuffer buf = new StreamBuffer(b, off, len); - bufferList.add(buf); - - writeChunkToContainer(buf.duplicate()); - - writtenDataLength += len; + private void doFlushIfNeeded() throws IOException { + Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config + .getDataStreamMaxBufferSize()); + long boundary = config.getDataStreamBufferFlushSize() / config + .getDataStreamMaxBufferSize(); + if (bufferList.size() % boundary == 0) { + updateFlushLength(); + executePutBlock(false, false); + } } private void updateFlushLength() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index fa2d192058..a97ea318e2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -325,6 +325,8 @@ public interface MiniOzoneCluster { protected Optional<Integer> chunkSize = Optional.empty(); protected OptionalInt streamBufferSize = OptionalInt.empty(); protected Optional<Long> streamBufferFlushSize = Optional.empty(); + protected Optional<Long> dataStreamBufferFlushSize= Optional.empty(); + protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); protected Optional<Long> streamBufferMaxSize = Optional.empty(); protected Optional<Long> blockSize = Optional.empty(); protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty(); @@ -566,6 +568,16 @@ public interface MiniOzoneCluster { return this; } + public Builder setDataStreamBufferMaxSize(int size) { + dataStreamMaxBufferSize = OptionalInt.of(size); + return this; + } + + public Builder setDataStreamBufferFlushize(long size) { + dataStreamBufferFlushSize = Optional.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 16736636f1..85572b19b3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -665,6 +665,12 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (!streamBufferMaxSize.isPresent()) { streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get()); } + if (!dataStreamBufferFlushSize.isPresent()) { + dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get()); + } + if (!dataStreamMaxBufferSize.isPresent()) { + dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -681,6 +687,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get()))); clientConfig.setStreamBufferFlushSize(Math.round( streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get()))); + clientConfig.setDataStreamBufferFlushSize(Math.round( + streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get()))); + clientConfig.setDataStreamMaxBufferSize((int) Math.round( + streamBufferSizeUnit.get() + .toBytes(dataStreamMaxBufferSize.getAsInt()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 05a101951b..5eb38a00de 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -20,7 +20,10 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -101,6 +104,8 @@ public class TestBlockDataStreamOutput { .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) + .setDataStreamBufferFlushize(maxFlushSize) + .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) .build(); cluster.waitForClusterToBeReady(); @@ -186,6 +191,35 @@ public class TestBlockDataStreamOutput { validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } + @Test + public void testPutBlockAtBoundary() throws Exception { + int dataLength = 500; + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + Assert.assertTrue( + metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + key.close(); + // Since data length is 500 , first putBlock will be at 400(flush boundary) + // and the other at 500 + Assert.assertTrue( + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) + == putBlockCount + 2); + validateData(keyName, data); + } + + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
