This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new a86732890d HDDS-11014. [hsync] Block finalization should also merge 
last chunk to blockDataTable. (#6847)
a86732890d is described below

commit a86732890d0721f529bc805c5d59ae1eb78a410f
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Jun 25 10:41:14 2024 -0700

    HDDS-11014. [hsync] Block finalization should also merge last chunk to 
blockDataTable. (#6847)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java       | 12 ++++++------
 .../hadoop/hdds/scm/storage/ECBlockOutputStream.java     |  4 ++--
 .../hadoop/hdds/scm/storage/ContainerProtocolCalls.java  |  6 ++++--
 .../hadoop/ozone/container/keyvalue/KeyValueHandler.java |  6 ++----
 .../ozone/container/keyvalue/impl/BlockManagerImpl.java  | 16 ++++++++++++++++
 5 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index c1f92c8337..2d40439c33 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -573,13 +573,13 @@ public class BlockOutputStream extends OutputStream {
   private void writeChunk(ChunkBuffer buffer)
       throws IOException {
     writeChunkCommon(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()), false);
+    writeChunkToContainer(buffer.duplicate(0, buffer.position()), false, 
false);
   }
 
-  private void writeChunkAndPutBlock(ChunkBuffer buffer)
+  private void writeChunkAndPutBlock(ChunkBuffer buffer, boolean close)
       throws IOException {
     writeChunkCommon(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()), true);
+    writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, close);
   }
 
   /**
@@ -618,7 +618,7 @@ public class BlockOutputStream extends OutputStream {
       if (currentBuffer.hasRemaining()) {
         if (allowPutBlockPiggybacking) {
           updateFlushLength();
-          writeChunkAndPutBlock(currentBuffer);
+          writeChunkAndPutBlock(currentBuffer, close);
         } else {
           writeChunk(currentBuffer);
           updateFlushLength();
@@ -743,7 +743,7 @@ public class BlockOutputStream extends OutputStream {
    * @return
    */
   CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
-      ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException {
+      ChunkBuffer chunk, boolean putBlockPiggybacking, boolean close) throws 
IOException {
     int effectiveChunkSize = chunk.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
     final ByteString data = chunk.toByteString(
@@ -805,7 +805,7 @@ public class BlockOutputStream extends OutputStream {
       }
 
       XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
-          blockID.get(), data, tokenString, replicationIndex, blockData);
+          blockID.get(), data, tokenString, replicationIndex, blockData, 
close);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
           respFuture = asyncReply.getResponse();
       validateFuture = respFuture.thenApplyAsync(e -> {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index a57ae74f99..843727cef2 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -90,13 +90,13 @@ public class ECBlockOutputStream extends BlockOutputStream {
   public void write(byte[] b, int off, int len) throws IOException {
     this.currentChunkRspFuture =
         writeChunkToContainer(
-            ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false);
+            ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false, false);
     updateWrittenDataLength(len);
   }
 
   public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> 
write(
       ByteBuffer buff) throws IOException {
-    return writeChunkToContainer(ChunkBuffer.wrap(buff), false);
+    return writeChunkToContainer(ChunkBuffer.wrap(buff), false, false);
   }
 
   public CompletableFuture<ContainerProtos.
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 37cc075f21..41d255ef07 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -436,10 +436,11 @@ public final class ContainerProtocolCalls  {
    * @param tokenString serialized block token
    * @throws IOException if there is an I/O error while performing the call
    */
+  @SuppressWarnings("parameternumber")
   public static XceiverClientReply writeChunkAsync(
       XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
       ByteString data, String tokenString,
-      int replicationIndex, BlockData blockData)
+      int replicationIndex, BlockData blockData, boolean close)
       throws IOException, ExecutionException, InterruptedException {
 
     WriteChunkRequestProto.Builder writeChunkRequest =
@@ -455,7 +456,8 @@ public final class ContainerProtocolCalls  {
     if (blockData != null) {
       PutBlockRequestProto.Builder createBlockRequest =
           PutBlockRequestProto.newBuilder()
-              .setBlockData(blockData);
+              .setBlockData(blockData)
+              .setEof(close);
       writeChunkRequest.setBlock(createBlockRequest);
     }
     String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 05979d85fa..440e7f4385 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -877,13 +877,11 @@ public class KeyValueHandler extends Handler {
         // block metadata is piggybacked in the same message.
         // there will not be an additional PutBlock request.
         //
-        // End of block will always be sent as a standalone PutBlock.
-        // the PutBlock piggybacked in WriteChunk is never end of block.
-        //
         // do not do this in WRITE_DATA phase otherwise PutBlock will be out
         // of order.
         blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
-        blockManager.putBlock(kvContainer, blockData, false);
+        boolean eob = writeChunk.getBlock().getEof();
+        blockManager.putBlock(kvContainer, blockData, eob);
         blockDataProto = blockData.getProtoBufMessage();
         final long numBytes = blockDataProto.getSerializedSize();
         metrics.incContainerBytesStats(Type.PutBlock, numBytes);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 27a40400b7..ceedcb94cf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -244,10 +244,26 @@ public class BlockManagerImpl implements BlockManager {
         db.getStore().getFinalizeBlocksTable().putWithBatch(batch,
             kvContainer.getContainerData().getBlockKey(localID), localID);
         db.getStore().getBatchHandler().commitBatchOperation(batch);
+
+        mergeLastChunkForBlockFinalization(blockId, db, kvContainer, batch, 
localID);
       }
     }
   }
 
+  private void mergeLastChunkForBlockFinalization(BlockID blockId, DBHandle db,
+                         KeyValueContainer kvContainer, BatchOperation batch,
+                         long localID) throws IOException {
+    // if the chunk list of the block to be finalized was written incremental,
+    // merge the last chunk into block data.
+    BlockData blockData = getBlockByID(db, blockId, 
kvContainer.getContainerData());
+    if (blockData.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST)) {
+      BlockData emptyBlockData = new BlockData(blockId);
+      emptyBlockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+      db.getStore().putBlockByID(batch, incrementalEnabled, localID,
+          emptyBlockData, kvContainer.getContainerData(), true);
+    }
+  }
+
   @Override
   public BlockData getBlock(Container container, BlockID blockID)
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to