This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ee77d423 HDDS-11340. Avoid extra PubBlock call when a full block is 
closed (#7094)
93ee77d423 is described below

commit 93ee77d423b5359c196b6183a7e0b0d6a55b93e0
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Aug 20 14:14:56 2024 +0800

    HDDS-11340. Avoid extra PubBlock call when a full block is closed (#7094)
---
 .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 15 ++++++++++++---
 .../hadoop/hdds/scm/storage/ECBlockOutputStream.java      |  2 +-
 .../hadoop/hdds/scm/storage/RatisBlockOutputStream.java   |  3 ++-
 .../scm/storage/TestBlockOutputStreamCorrectness.java     |  1 +
 .../hadoop/ozone/client/io/BlockOutputStreamEntry.java    |  2 +-
 5 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index aab70a692e..220e5481cb 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -94,6 +95,9 @@ public class BlockOutputStream extends OutputStream {
       KeyValue.newBuilder().setKey(FULL_CHUNK).build();
 
   private AtomicReference<BlockID> blockID;
+  // planned block full size
+  private long blockSize;
+  private AtomicBoolean eofSent = new AtomicBoolean(false);
   private final AtomicReference<ChunkInfo> previousChunkInfo
       = new AtomicReference<>();
 
@@ -164,6 +168,7 @@ public class BlockOutputStream extends OutputStream {
   @SuppressWarnings("checkstyle:ParameterNumber")
   public BlockOutputStream(
       BlockID blockID,
+      long blockSize,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
       BufferPool bufferPool,
@@ -175,6 +180,7 @@ public class BlockOutputStream extends OutputStream {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
     this.blockID = new AtomicReference<>(blockID);
+    this.blockSize = blockSize;
     replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
     KeyValue keyValue =
         KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -530,7 +536,7 @@ public class BlockOutputStream extends OutputStream {
     final XceiverClientReply asyncReply;
     try {
       BlockData blockData = containerBlockData.build();
-      LOG.debug("sending PutBlock {}", blockData);
+      LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos);
 
       if (config.getIncrementalChunkList()) {
         // remove any chunks in the containerBlockData list.
@@ -538,7 +544,9 @@ public class BlockOutputStream extends OutputStream {
         containerBlockData.clearChunks();
       }
 
-      asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
+      // if block is full, send the eof
+      boolean isBlockFull = (blockSize != -1 && flushPos == blockSize);
+      asyncReply = putBlockAsync(xceiverClient, blockData, close || 
isBlockFull, tokenString);
       CompletableFuture<ContainerCommandResponseProto> future = 
asyncReply.getResponse();
       flushFuture = future.thenApplyAsync(e -> {
         try {
@@ -550,6 +558,7 @@ public class BlockOutputStream extends OutputStream {
         if (getIoException() == null && !force) {
           handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
               asyncReply, flushPos, byteBufferList);
+          eofSent.set(close || isBlockFull);
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -690,7 +699,7 @@ public class BlockOutputStream extends OutputStream {
       // There're no pending written data, but there're uncommitted data.
       updatePutBlockLength();
       putBlockResultFuture = executePutBlock(close, false);
-    } else if (close) {
+    } else if (close && !eofSent.get()) {
       // forcing an "empty" putBlock if stream is being closed without new
       // data since latest flush - we need to send the "EOF" flag
       updatePutBlockLength();
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index bbb3f30687..12ca9978c6 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -80,7 +80,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
       ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
       Supplier<ExecutorService> executorServiceSupplier
   ) throws IOException {
-    super(blockID, xceiverClientManager,
+    super(blockID, -1, xceiverClientManager,
         pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, 
executorServiceSupplier);
     // In EC stream, there will be only one node in pipeline.
     this.datanodeDetails = pipeline.getClosestNode();
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index c0e99a5b4a..d32c37eba6 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
   @SuppressWarnings("checkstyle:ParameterNumber")
   public RatisBlockOutputStream(
       BlockID blockID,
+      long blockSize,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
       BufferPool bufferPool,
@@ -80,7 +81,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
       ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
       Supplier<ExecutorService> blockOutputStreamResourceProvider
   ) throws IOException {
-    super(blockID, xceiverClientManager, pipeline,
+    super(blockID, blockSize, xceiverClientManager, pipeline,
         bufferPool, config, token, clientMetrics, streamBufferArgs, 
blockOutputStreamResourceProvider);
     this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
   }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index df4d1cb3f8..d3425b7d2b 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -174,6 +174,7 @@ class TestBlockOutputStreamCorrectness {
 
     return new RatisBlockOutputStream(
         new BlockID(1L, 1L),
+        -1,
         xcm,
         pipeline,
         bufferPool,
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 18a9231c66..5e6ecceefa 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -108,7 +108,7 @@ public class BlockOutputStreamEntry extends OutputStream {
    * @throws IOException
    */
   void createOutputStream() throws IOException {
-    outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
+    outputStream = new RatisBlockOutputStream(blockID, length, 
xceiverClientManager,
         pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
         executorServiceSupplier);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to