This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 93ee77d423 HDDS-11340. Avoid extra PubBlock call when a full block is
closed (#7094)
93ee77d423 is described below
commit 93ee77d423b5359c196b6183a7e0b0d6a55b93e0
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Aug 20 14:14:56 2024 +0800
HDDS-11340. Avoid extra PubBlock call when a full block is closed (#7094)
---
.../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 15 ++++++++++++---
.../hadoop/hdds/scm/storage/ECBlockOutputStream.java | 2 +-
.../hadoop/hdds/scm/storage/RatisBlockOutputStream.java | 3 ++-
.../scm/storage/TestBlockOutputStreamCorrectness.java | 1 +
.../hadoop/ozone/client/io/BlockOutputStreamEntry.java | 2 +-
5 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index aab70a692e..220e5481cb 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -94,6 +95,9 @@ public class BlockOutputStream extends OutputStream {
KeyValue.newBuilder().setKey(FULL_CHUNK).build();
private AtomicReference<BlockID> blockID;
+ // planned block full size
+ private long blockSize;
+ private AtomicBoolean eofSent = new AtomicBoolean(false);
private final AtomicReference<ChunkInfo> previousChunkInfo
= new AtomicReference<>();
@@ -164,6 +168,7 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public BlockOutputStream(
BlockID blockID,
+ long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
@@ -175,6 +180,7 @@ public class BlockOutputStream extends OutputStream {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
+ this.blockSize = blockSize;
replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -530,7 +536,7 @@ public class BlockOutputStream extends OutputStream {
final XceiverClientReply asyncReply;
try {
BlockData blockData = containerBlockData.build();
- LOG.debug("sending PutBlock {}", blockData);
+ LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos);
if (config.getIncrementalChunkList()) {
// remove any chunks in the containerBlockData list.
@@ -538,7 +544,9 @@ public class BlockOutputStream extends OutputStream {
containerBlockData.clearChunks();
}
- asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
+ // if block is full, send the eof
+ boolean isBlockFull = (blockSize != -1 && flushPos == blockSize);
+ asyncReply = putBlockAsync(xceiverClient, blockData, close ||
isBlockFull, tokenString);
CompletableFuture<ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
@@ -550,6 +558,7 @@ public class BlockOutputStream extends OutputStream {
if (getIoException() == null && !force) {
handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
+ eofSent.set(close || isBlockFull);
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -690,7 +699,7 @@ public class BlockOutputStream extends OutputStream {
// There're no pending written data, but there're uncommitted data.
updatePutBlockLength();
putBlockResultFuture = executePutBlock(close, false);
- } else if (close) {
+ } else if (close && !eofSent.get()) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
updatePutBlockLength();
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index bbb3f30687..12ca9978c6 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -80,7 +80,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
- super(blockID, xceiverClientManager,
+ super(blockID, -1, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index c0e99a5b4a..d32c37eba6 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
BlockID blockID,
+ long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
@@ -80,7 +81,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
- super(blockID, xceiverClientManager, pipeline,
+ super(blockID, blockSize, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs,
blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index df4d1cb3f8..d3425b7d2b 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -174,6 +174,7 @@ class TestBlockOutputStreamCorrectness {
return new RatisBlockOutputStream(
new BlockID(1L, 1L),
+ -1,
xcm,
pipeline,
bufferPool,
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 18a9231c66..5e6ecceefa 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -108,7 +108,7 @@ public class BlockOutputStreamEntry extends OutputStream {
* @throws IOException
*/
void createOutputStream() throws IOException {
- outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
+ outputStream = new RatisBlockOutputStream(blockID, length,
xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]