This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 30f8311b0e2c92e2c3b665ac98993fc23a0b6709 Author: Sadanand Shenoy <[email protected]> AuthorDate: Thu Feb 10 14:36:03 2022 +0530 HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry bufferList. (#2946) --- .../apache/hadoop/hdds/scm/OzoneClientConfig.java | 33 +++++++++--------- .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 2 +- .../hdds/scm/storage/BlockDataStreamOutput.java | 40 +++++++++++++++++----- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 +++---- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 11 +++--- .../client/rpc/TestBlockDataStreamOutput.java | 14 ++++---- 6 files changed, 67 insertions(+), 45 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 86a725220a..80cb14f03e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -66,14 +66,6 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int streamBufferSize = 4 * 1024 * 1024; - @Config(key = "datastream.max.buffer.size", - defaultValue = "4MB", - type = ConfigType.SIZE, - description = "The maximum size of the ByteBuffer " - + "(used via ratis streaming)", - tags = ConfigTag.CLIENT) - private int dataStreamMaxBufferSize = 4 * 1024 * 1024; - @Config(key = "datastream.buffer.flush.size", defaultValue = "16MB", type = ConfigType.SIZE, @@ -89,6 +81,15 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private int dataStreamMinPacketSize = 1024 * 1024; + @Config(key = "datastream.window.size", + defaultValue = "64MB", + type = ConfigType.SIZE, + description = "Maximum size of BufferList(used for retry) size per " + + "BlockDataStreamOutput instance", + tags = ConfigTag.CLIENT) + private long streamWindowSize = 64 * 1024 * 1024; + + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -251,14 +252,6 @@ public class OzoneClientConfig { this.streamBufferSize = streamBufferSize; } - public int getDataStreamMaxBufferSize() { - return dataStreamMaxBufferSize; - } - - public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) { - this.dataStreamMaxBufferSize = dataStreamMaxBufferSize; - } - public boolean isStreamBufferFlushDelay() { return streamBufferFlushDelay; } @@ -283,6 +276,14 @@ public class OzoneClientConfig { this.dataStreamMinPacketSize = dataStreamMinPacketSize; } + public long getStreamWindowSize() { + return streamWindowSize; + } + + public void setStreamWindowSize(long streamWindowSize) { + this.streamWindowSize = streamWindowSize; + } + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 8e4ab16ee8..3ea269b08b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -136,7 +136,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { .orElse(0L); } - private long updateCommitInfosMap( + public long updateCommitInfosMap( Collection<RaftProtos.CommitInfoProto> commitInfoProtos) { // if the commitInfo map is empty, just update the commit indexes for each // of the servers diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index ec925d1e6a..a3fe1c2479 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -52,7 +52,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -116,9 +118,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Also, corresponding to the logIndex, the corresponding list of buffers will // be released from the buffer pool. private final StreamCommitWatcher commitWatcher; - private final AtomicReference<CompletableFuture< - ContainerCommandResponseProto>> putBlockFuture - = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + private Queue<CompletableFuture<ContainerCommandResponseProto>> + putBlockFutures = new LinkedList<>(); private final List<DatanodeDetails> failedServers; private final Checksum checksum; @@ -307,14 +309,33 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { } private void doFlushIfNeeded() throws IOException { - Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config - .getDataStreamMaxBufferSize()); long boundary = config.getDataStreamBufferFlushSize() / config - .getDataStreamMaxBufferSize(); + .getDataStreamMinPacketSize(); + // streamWindow is the maximum number of buffers that + // are allowed to exist in the bufferList. If buffers in + // the list exceed this limit , client will till it gets + // one putBlockResponse (first index) . This is similar to + // the bufferFull condition in async write path. + long streamWindow = config.getStreamWindowSize() / config + .getDataStreamMinPacketSize(); if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) { updateFlushLength(); executePutBlock(false, false); } + if (bufferList.size()==streamWindow){ + try { + checkOpen(); + if (!putBlockFutures.isEmpty()) { + putBlockFutures.remove().get(); + } + } catch (ExecutionException e) { + handleExecutionException(e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, true); + } + watchForCommit(true); + } } private void updateFlushLength() { @@ -453,8 +474,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { setIoException(ce); throw ce; }); - putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture, - (previous, current) -> current)); + putBlockFutures.add(flushFuture); } catch (IOException | ExecutionException e) { throw new IOException(EXCEPTION_MSG + e.toString(), e); } catch (InterruptedException ex) { @@ -496,7 +516,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // data since latest flush - we need to send the "EOF" flag executePutBlock(true, true); } - putBlockFuture.get().get(); + CompletableFuture.allOf(putBlockFutures.toArray(EMPTY_FUTURE_ARRAY)).get(); watchForCommit(false); // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded @@ -638,6 +658,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { CompletionException ce = new CompletionException(msg, e); setIoException(ce); throw ce; + } else if (r.isSuccess()) { + xceiverClient.updateCommitInfosMap(r.getCommitInfos()); } }, responseExecutor); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index a1783a6cb5..4809f0c357 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -326,7 +326,7 @@ public interface MiniOzoneCluster { protected OptionalInt streamBufferSize = OptionalInt.empty(); protected Optional<Long> streamBufferFlushSize = Optional.empty(); protected Optional<Long> dataStreamBufferFlushSize= Optional.empty(); - protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); + protected Optional<Long> datastreamWindowSize= Optional.empty(); protected Optional<Long> streamBufferMaxSize = Optional.empty(); protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional<Long> blockSize = Optional.empty(); @@ -569,11 +569,6 @@ public interface MiniOzoneCluster { return this; } - public Builder setDataStreamBufferMaxSize(int size) { - dataStreamMaxBufferSize = OptionalInt.of(size); - return this; - } - public Builder setDataStreamBufferFlushize(long size) { dataStreamBufferFlushSize = Optional.of(size); return this; @@ -584,6 +579,11 @@ public interface MiniOzoneCluster { return this; } + public Builder setDataStreamStreamWindowSize(long size) { + datastreamWindowSize = Optional.of(size); + return this; + } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 44e870104f..866f4188ea 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -668,12 +668,12 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (!dataStreamBufferFlushSize.isPresent()) { dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get()); } - if (!dataStreamMaxBufferSize.isPresent()) { - dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); - } if (!dataStreamMinPacketSize.isPresent()) { dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4); } + if (!datastreamWindowSize.isPresent()) { + datastreamWindowSize = Optional.of((long) 8 * chunkSize.get()); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -692,12 +692,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get()))); clientConfig.setDataStreamBufferFlushSize(Math.round( streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get()))); - clientConfig.setDataStreamMaxBufferSize((int) Math.round( - streamBufferSizeUnit.get() - .toBytes(dataStreamMaxBufferSize.getAsInt()))); clientConfig.setDataStreamMinPacketSize((int) Math.round( streamBufferSizeUnit.get() .toBytes(dataStreamMinPacketSize.getAsInt()))); + clientConfig.setStreamWindowSize(Math.round( + streamBufferSizeUnit.get().toBytes(datastreamWindowSize.get()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c6a3c32d2b..696ab92ab7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -106,9 +106,9 @@ public class TestBlockDataStreamOutput { .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) .setDataStreamBufferFlushize(maxFlushSize) - .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) - .setDataStreamMinPacketSize(2*chunkSize/5) + .setDataStreamMinPacketSize(chunkSize) + .setDataStreamStreamWindowSize(5*chunkSize) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -195,7 +195,7 @@ public class TestBlockDataStreamOutput { @Test public void testPutBlockAtBoundary() throws Exception { - int dataLength = 200; + int dataLength = 500; XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long putBlockCount = metrics.getContainerOpCountMetrics( @@ -213,8 +213,8 @@ public class TestBlockDataStreamOutput { metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); key.close(); - // Since data length is 200 , first putBlock will be at 160(flush boundary) - // and the other at 200 + // Since data length is 500 , first putBlock will be at 400(flush boundary) + // and the other at 500 Assert.assertTrue( metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) == putBlockCount + 2); @@ -242,10 +242,10 @@ public class TestBlockDataStreamOutput { long writeChunkCount = metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5) + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); - // minPacketSize= 40, so first write of 20 wont trigger a writeChunk + // minPacketSize= 100, so first write of 50 wont trigger a writeChunk Assert.assertEquals(writeChunkCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); key.write(ByteBuffer.wrap(data)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
