This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 1f56a45e7578fdd495e852f0e449e62b1079c640 Author: Sadanand Shenoy <[email protected]> AuthorDate: Tue Dec 21 12:22:30 2021 +0530 HDDS-6039. Define a minimum packet size during streaming writes. (#2883) --- .../apache/hadoop/hdds/scm/OzoneClientConfig.java | 16 ++++++ .../hdds/scm/storage/BlockDataStreamOutput.java | 57 ++++++++++++++++------ .../hadoop/hdds/scm/storage/StreamBuffer.java | 15 +++++- .../hdds/scm/storage/StreamCommitWatcher.java | 2 +- .../client/io/BlockDataStreamOutputEntryPool.java | 2 +- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 6 +++ .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 6 +++ .../client/rpc/TestBlockDataStreamOutput.java | 32 ++++++++++-- .../rpc/TestContainerStateMachineStream.java | 1 + .../client/rpc/TestOzoneRpcClientAbstract.java | 1 + 10 files changed, 118 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 715a19212b..86a725220a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -81,6 +81,14 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private long dataStreamBufferFlushSize = 16 * 1024 * 1024; + @Config(key = "datastream.min.packet.size", + defaultValue = "1MB", + type = ConfigType.SIZE, + description = "The maximum size of the ByteBuffer " + + "(used via ratis streaming)", + tags = ConfigTag.CLIENT) + private int dataStreamMinPacketSize = 1024 * 1024; + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -267,6 +275,14 @@ public class OzoneClientConfig { this.streamBufferMaxSize = streamBufferMaxSize; } + public int getDataStreamMinPacketSize() { + return dataStreamMinPacketSize; + } + + public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) { + this.dataStreamMinPacketSize = dataStreamMinPacketSize; + } + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 6f5a54354a..9fb1340527 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -125,7 +127,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>(); private final long syncSize = 0; // TODO: disk sync is disabled for now private long syncPosition = 0; - + private StreamBuffer currentBuffer; + private XceiverClientMetrics metrics; /** * Creates a new BlockDataStreamOutput. * @@ -172,6 +175,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + metrics = XceiverClientManager.getXceiverClientMetrics(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -257,27 +261,47 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { if (len == 0) { return; } - int curLen = len; - // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold - int maxBufferLen = config.getDataStreamMaxBufferSize(); - while (curLen > 0) { - int writeLen = Math.min(curLen, maxBufferLen); + while (len > 0) { + allocateNewBufferIfNeeded(); + int writeLen = Math.min(len, currentBuffer.length()); final StreamBuffer buf = new StreamBuffer(b, off, writeLen); + currentBuffer.put(buf); + writeChunkIfNeeded(); off += writeLen; - bufferList.add(buf); - writeChunkToContainer(buf.duplicate()); - curLen -= writeLen; writtenDataLength += writeLen; + len -= writeLen; doFlushIfNeeded(); } } + private void writeChunkIfNeeded() throws IOException { + if (currentBuffer.length()==0) { + writeChunk(currentBuffer); + currentBuffer = null; + } + } + + private void writeChunk(StreamBuffer sb) throws IOException { + bufferList.add(sb); + ByteBuffer dup = sb.duplicate(); + dup.position(0); + dup.limit(sb.position()); + writeChunkToContainer(dup); + } + + private void allocateNewBufferIfNeeded() { + if (currentBuffer==null) { + currentBuffer = + StreamBuffer.allocate(config.getDataStreamMinPacketSize()); + } + } + private void doFlushIfNeeded() throws IOException { Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config .getDataStreamMaxBufferSize()); long boundary = config.getDataStreamBufferFlushSize() / config .getDataStreamMaxBufferSize(); - if (bufferList.size() % boundary == 0) { + if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) { updateFlushLength(); executePutBlock(false, false); } @@ -308,11 +332,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { int count = 0; while (len > 0) { final StreamBuffer buf = bufferList.get(count); - final long writeLen = Math.min(buf.length(), len); + final long writeLen = Math.min(buf.position(), len); final ByteBuffer duplicated = buf.duplicate(); - if (writeLen != buf.length()) { - duplicated.limit(Math.toIntExact(len)); - } + duplicated.position(0); + duplicated.limit(buf.position()); writeChunkToContainer(duplicated); len -= writeLen; count++; @@ -449,6 +472,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer + + if (currentBuffer!=null) { + writeChunk(currentBuffer); + currentBuffer = null; + } updateFlushLength(); executePutBlock(close, false); } else if (close) { @@ -584,6 +612,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { .setLen(effectiveChunkSize) .setChecksumData(checksumData.getProtoBufMessage()) .build(); + metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk); if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index f36019e2ae..5118ea5ead 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -27,7 +27,7 @@ public class StreamBuffer { private final ByteBuffer buffer; public StreamBuffer(ByteBuffer buffer) { - this.buffer = buffer.asReadOnlyBuffer(); + this.buffer = buffer; } public StreamBuffer(ByteBuffer buffer, int offset, int length) { @@ -43,4 +43,17 @@ public class StreamBuffer { return buffer.limit() - buffer.position(); } + public int position() { + return buffer.position(); + } + + + public void put(StreamBuffer sb){ + buffer.put(sb.buffer); + } + + public static StreamBuffer allocate(int size){ + return new StreamBuffer(ByteBuffer.allocate(size)); + } + } \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 3a59d07571..9ae604e951 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -178,7 +178,7 @@ public class StreamCommitWatcher { Preconditions.checkState(commitIndexMap.containsKey(index)); final List<StreamBuffer> buffers = commitIndexMap.remove(index); final long length = - buffers.stream().mapToLong(StreamBuffer::length).sum(); + buffers.stream().mapToLong(StreamBuffer::position).sum(); totalAckDataLength += length; // clear the future object from the future Map final CompletableFuture<ContainerCommandResponseProto> remove = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index e49b0b79ad..24a046f623 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -309,7 +309,7 @@ public class BlockDataStreamOutputEntryPool { long computeBufferData() { long totalDataLen =0; for (StreamBuffer b : bufferList){ - totalDataLen += b.length(); + totalDataLen += b.position(); } return totalDataLen; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index a97ea318e2..a1783a6cb5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -328,6 +328,7 @@ public interface MiniOzoneCluster { protected Optional<Long> dataStreamBufferFlushSize= Optional.empty(); protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); protected Optional<Long> streamBufferMaxSize = Optional.empty(); + protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional<Long> blockSize = Optional.empty(); protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty(); protected boolean includeRecon = false; @@ -578,6 +579,11 @@ public interface MiniOzoneCluster { return this; } + public Builder setDataStreamMinPacketSize(int size) { + dataStreamMinPacketSize = OptionalInt.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 85572b19b3..44e870104f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -671,6 +671,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (!dataStreamMaxBufferSize.isPresent()) { dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); } + if (!dataStreamMinPacketSize.isPresent()) { + dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -692,6 +695,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { clientConfig.setDataStreamMaxBufferSize((int) Math.round( streamBufferSizeUnit.get() .toBytes(dataStreamMaxBufferSize.getAsInt()))); + clientConfig.setDataStreamMinPacketSize((int) Math.round( + streamBufferSizeUnit.get() + .toBytes(dataStreamMinPacketSize.getAsInt()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 5eb38a00de..c9242df8b1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -107,6 +107,7 @@ public class TestBlockDataStreamOutput { .setDataStreamBufferFlushize(maxFlushSize) .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) + .setDataStreamMinPacketSize(2*chunkSize/5) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -193,7 +194,7 @@ public class TestBlockDataStreamOutput { @Test public void testPutBlockAtBoundary() throws Exception { - int dataLength = 500; + int dataLength = 200; XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long putBlockCount = metrics.getContainerOpCountMetrics( @@ -211,8 +212,8 @@ public class TestBlockDataStreamOutput { metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); key.close(); - // Since data length is 500 , first putBlock will be at 400(flush boundary) - // and the other at 500 + // Since data length is 200 , first putBlock will be at 160(flush boundary) + // and the other at 200 Assert.assertTrue( metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) == putBlockCount + 2); @@ -230,4 +231,29 @@ public class TestBlockDataStreamOutput { .validateData(keyName, data, objectStore, volumeName, bucketName); } + + @Test + public void testMinPacketSize() throws Exception { + String keyName = getKeyName(); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + // minPacketSize= 40, so first write of 20 wont trigger a writeChunk + Assert.assertEquals(writeChunkCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + key.write(ByteBuffer.wrap(data)); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // now close the stream, It will update the key length. + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java index 3b17450376..f4c756bccd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java @@ -119,6 +119,7 @@ public class TestContainerStateMachineStream { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index b8fc543f19..8d4b02eead 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -200,6 +200,7 @@ public abstract class TestOzoneRpcClientAbstract { .setTotalPipelineNumLimit(10) .setScmId(scmId) .setClusterId(clusterId) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); ozClient = OzoneClientFactory.getRpcClient(conf); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
