This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new 629096f HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support
FlushDelay. (#3002)
629096f is described below
commit 629096f0d27ebe91aec6d2c9a819bf6fb2d4b1f9
Author: micah zhao <[email protected]>
AuthorDate: Mon Feb 14 20:58:11 2022 +0800
HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay.
(#3002)
---
.../apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 10 +++++++++-
.../hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java | 1 -
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index a3fe1c2..9ac4330 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -429,7 +429,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
} else {
byteBufferList = null;
}
- flush();
+ waitFuturesComplete();
if (close) {
dataStreamCloseReply = out.closeAsync();
}
@@ -485,8 +485,16 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
@Override
public void flush() throws IOException {
+ if (xceiverClientFactory != null && xceiverClient != null
+ && !config.isStreamBufferFlushDelay()) {
+ waitFuturesComplete();
+ }
+ }
+
+ public void waitFuturesComplete() throws IOException {
try {
CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+ futures.clear();
} catch (Exception e) {
LOG.warn("Failed to write all chunks through stream: " + e);
throw new IOException(e);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 696ab92..2100337 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -89,7 +89,6 @@ public class TestBlockDataStreamOutput {
blockSize = 2 * maxFlushSize;
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
- clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]