This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 91e5d2eb28 HDDS-9130. [hsync] Combine WriteData and PutBlock requests 
into one (#5980)
91e5d2eb28 is described below

commit 91e5d2eb289fcbe8ab1e7c4a40d4f0c839598597
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Apr 1 01:20:15 2024 -0700

    HDDS-9130. [hsync] Combine WriteData and PutBlock requests into one (#5980)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  16 ++
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 168 +++++++++++++++------
 .../hdds/scm/storage/ECBlockOutputStream.java      |   5 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |   2 +
 .../ContainerCommandResponseBuilders.java          |  23 +++
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  10 +-
 .../server/ratis/ContainerStateMachine.java        |   5 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  27 +++-
 .../container/keyvalue/impl/BlockManagerImpl.java  |   3 +-
 .../container/metadata/AbstractDatanodeStore.java  |   2 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |   2 +
 .../hadoop/ozone/client/MockXceiverClientSpi.java  |  34 +++--
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java |  26 ++--
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   2 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |  21 ++-
 .../ozone/om/TestOmContainerLocationCache.java     |  36 ++++-
 16 files changed, 288 insertions(+), 94 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 4af9009e16..d1992ac931 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -256,6 +256,14 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private boolean incrementalChunkList = true;
 
+  @Config(key = "stream.putblock.piggybacking",
+          defaultValue = "false",
+          type = ConfigType.BOOLEAN,
+          description = "Allow PutBlock to be piggybacked in WriteChunk " +
+                  "requests if the chunk is small.",
+          tags = ConfigTag.CLIENT)
+  private boolean enablePutblockPiggybacking = false;
+
   @PostConstruct
   public void validate() {
     Preconditions.checkState(streamBufferSize > 0);
@@ -454,6 +462,14 @@ public class OzoneClientConfig {
     return fsDefaultBucketLayout;
   }
 
+  public void setEnablePutblockPiggybacking(boolean 
enablePutblockPiggybacking) {
+    this.enablePutblockPiggybacking = enablePutblockPiggybacking;
+  }
+
+  public boolean getEnablePutblockPiggybacking() {
+    return enablePutblockPiggybacking;
+  }
+
   public boolean isDatastreamPipelineMode() {
     return datastreamPipelineMode;
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 239800746c..f29bf49038 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
+import static 
org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC;
 import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
 import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -140,6 +142,7 @@ public class BlockOutputStream extends OutputStream {
   private int replicationIndex;
   private Pipeline pipeline;
   private final ContainerClientMetrics clientMetrics;
+  private boolean allowPutBlockPiggybacking;
 
   /**
    * Creates a new BlockOutputStream.
@@ -211,6 +214,20 @@ public class BlockOutputStream extends OutputStream {
     this.clientMetrics = clientMetrics;
     this.pipeline = pipeline;
     this.streamBufferArgs = streamBufferArgs;
+    this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
+            allDataNodesSupportPiggybacking();
+  }
+
+  private boolean allDataNodesSupportPiggybacking() {
+    // return true only if all DataNodes in the pipeline are on a version
+    // that supports PutBlock piggybacking.
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      if (dn.getCurrentVersion() <
+              COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) {
+        return false;
+      }
+    }
+    return true;
   }
 
   void refreshCurrentBuffer() {
@@ -499,22 +516,8 @@ public class BlockOutputStream extends OutputStream {
         }
         // if the ioException is not set, putBlock is successful
         if (getIoException() == null && !force) {
-          BlockID responseBlockID = BlockID.getFromProtobuf(
-              e.getPutBlock().getCommittedBlockLength().getBlockID());
-          Preconditions.checkState(blockID.get().getContainerBlockID()
-              .equals(responseBlockID.getContainerBlockID()));
-          // updates the bcsId of the block
-          blockID.set(responseBlockID);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Adding index " + asyncReply.getLogIndex() + " flushLength "
-                    + flushPos + " numBuffers " + byteBufferList.size()
-                    + " blockID " + blockID + " bufferPool size" + bufferPool
-                    .getSize() + " currentBufferIndex " + bufferPool
-                    .getCurrentBufferIndex());
-          }
-          // for standalone protocol, logIndex will always be 0.
-          updateCommitInfo(asyncReply, byteBufferList);
+          handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
+              asyncReply, flushPos, byteBufferList);
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -551,7 +554,7 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-  private void writeChunk(ChunkBuffer buffer)
+  private void writeChunkCommon(ChunkBuffer buffer)
       throws IOException {
     // This data in the buffer will be pushed to datanode and a reference will
     // be added to the bufferList. Once putBlock gets executed, this list will
@@ -562,7 +565,18 @@ public class BlockOutputStream extends OutputStream {
       bufferList = new ArrayList<>();
     }
     bufferList.add(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()));
+  }
+
+  private void writeChunk(ChunkBuffer buffer)
+      throws IOException {
+    writeChunkCommon(buffer);
+    writeChunkToContainer(buffer.duplicate(0, buffer.position()), false);
+  }
+
+  private void writeChunkAndPutBlock(ChunkBuffer buffer)
+      throws IOException {
+    writeChunkCommon(buffer);
+    writeChunkToContainer(buffer.duplicate(0, buffer.position()), true);
   }
 
   /**
@@ -594,14 +608,23 @@ public class BlockOutputStream extends OutputStream {
     if (totalDataFlushedLength < writtenDataLength) {
       refreshCurrentBuffer();
       Preconditions.checkArgument(currentBuffer.position() > 0);
-      if (currentBuffer.hasRemaining()) {
-        writeChunk(currentBuffer);
-      }
+
       // This can be a partially filled chunk. Since we are flushing the buffer
       // here, we just limit this buffer to the current position. So that next
       // write will happen in new buffer
-      updateFlushLength();
-      executePutBlock(close, false);
+      if (currentBuffer.hasRemaining()) {
+        if (allowPutBlockPiggybacking) {
+          updateFlushLength();
+          writeChunkAndPutBlock(currentBuffer);
+        } else {
+          writeChunk(currentBuffer);
+          updateFlushLength();
+          executePutBlock(close, false);
+        }
+      } else {
+        updateFlushLength();
+        executePutBlock(close, false);
+      }
     } else if (close) {
       // forcing an "empty" putBlock if stream is being closed without new
       // data since latest flush - we need to send the "EOF" flag
@@ -713,7 +736,7 @@ public class BlockOutputStream extends OutputStream {
    * @return
    */
   CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
-      ChunkBuffer chunk) throws IOException {
+      ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
     final ByteString data = chunk.toByteString(
@@ -726,6 +749,8 @@ public class BlockOutputStream extends OutputStream {
         .setChecksumData(checksumData.getProtoBufMessage())
         .build();
 
+    long flushPos = totalDataFlushedLength;
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing chunk {} length {} at offset {}",
           chunkInfo.getChunkName(), effectiveChunkSize, offset);
@@ -743,42 +768,93 @@ public class BlockOutputStream extends OutputStream {
           + ", previous = " + previous);
     }
 
+    final List<ChunkBuffer> byteBufferList;
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+        validateFuture = null;
     try {
-      XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
-          blockID.get(), data, tokenString, replicationIndex);
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-          respFuture = asyncReply.getResponse();
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-          validateFuture = respFuture.thenApplyAsync(e -> {
-            try {
-              validateResponse(e);
-            } catch (IOException sce) {
-              respFuture.completeExceptionally(sce);
-            }
-            return e;
-          }, responseExecutor).exceptionally(e -> {
-            String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
-                " into block " + blockID;
-            LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
-            CompletionException ce = new CompletionException(msg, e);
-            setIoException(ce);
-            throw ce;
-          });
+      BlockData blockData = null;
+
       if (config.getIncrementalChunkList()) {
         updateBlockDataForWriteChunk(chunk);
       } else {
         containerBlockData.addChunks(chunkInfo);
       }
+      if (putBlockPiggybacking) {
+        Preconditions.checkNotNull(bufferList);
+        byteBufferList = bufferList;
+        bufferList = null;
+        Preconditions.checkNotNull(byteBufferList);
+
+        blockData = containerBlockData.build();
+        LOG.debug("piggyback chunk list {}", blockData);
+
+        if (config.getIncrementalChunkList()) {
+          // remove any chunks in the containerBlockData list.
+          // since they are sent.
+          containerBlockData.clearChunks();
+        }
+      } else {
+        byteBufferList = null;
+      }
 
+      XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
+          blockID.get(), data, tokenString, replicationIndex, blockData);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          respFuture = asyncReply.getResponse();
+      validateFuture = respFuture.thenApplyAsync(e -> {
+        try {
+          validateResponse(e);
+        } catch (IOException sce) {
+          respFuture.completeExceptionally(sce);
+        }
+        // if the ioException is not set, putBlock is successful
+        if (getIoException() == null && putBlockPiggybacking) {
+          handleSuccessfulPutBlock(e.getWriteChunk().getCommittedBlockLength(),
+              asyncReply, flushPos, byteBufferList);
+        }
+        return e;
+      }, responseExecutor).exceptionally(e -> {
+        String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+            " into block " + blockID;
+        LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+        CompletionException ce = new CompletionException(msg, e);
+        setIoException(ce);
+        throw ce;
+      });
       clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
-      return validateFuture;
+
     } catch (IOException | ExecutionException e) {
       throw new IOException(EXCEPTION_MSG + e.toString(), e);
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
     }
-    return null;
+    if (putBlockPiggybacking) {
+      putFlushFuture(flushPos, validateFuture);
+    }
+    return validateFuture;
+  }
+
+  private void handleSuccessfulPutBlock(
+      ContainerProtos.GetCommittedBlockLengthResponseProto e,
+      XceiverClientReply asyncReply, long flushPos,
+      List<ChunkBuffer> byteBufferList) {
+    BlockID responseBlockID = BlockID.getFromProtobuf(
+        e.getBlockID());
+    Preconditions.checkState(blockID.get().getContainerBlockID()
+        .equals(responseBlockID.getContainerBlockID()));
+    // updates the bcsId of the block
+    blockID.set(responseBlockID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Adding index " + asyncReply.getLogIndex() + " flushLength "
+              + flushPos + " numBuffers " + byteBufferList.size()
+              + " blockID " + blockID + " bufferPool size" + bufferPool
+              .getSize() + " currentBufferIndex " + bufferPool
+              .getCurrentBufferIndex());
+    }
+    // for standalone protocol, logIndex will always be 0.
+    updateCommitInfo(asyncReply, byteBufferList);
   }
 
   /**
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index adecc3e4c1..c8bfaf3e1b 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -89,13 +89,14 @@ public class ECBlockOutputStream extends BlockOutputStream {
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     this.currentChunkRspFuture =
-        writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+        writeChunkToContainer(
+            ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false);
     updateWrittenDataLength(len);
   }
 
   public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> 
write(
       ByteBuffer buff) throws IOException {
-    return writeChunkToContainer(ChunkBuffer.wrap(buff));
+    return writeChunkToContainer(ChunkBuffer.wrap(buff), false);
   }
 
   public CompletableFuture<ContainerProtos.
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
index 30f9df597b..e35d20d53e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
@@ -31,6 +31,8 @@ public enum DatanodeVersion implements ComponentVersion {
   DEFAULT_VERSION(0, "Initial version"),
 
   SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."),
+  COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
+          "a PutBlock request"),
 
   FUTURE_VERSION(-1, "Used internally in the client when the server side is "
       + " newer and an unknown server version has arrived to the client.");
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 86336e9bc7..d3f39c023b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFi
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -213,6 +214,28 @@ public final class ContainerCommandResponseBuilders {
         .build();
   }
 
+  /**
+   * Gets a response for the WriteChunk RPC.
+   * @param msg - ContainerCommandRequestProto
+   * @return - ContainerCommandResponseProto
+   */
+  public static ContainerCommandResponseProto getWriteChunkResponseSuccess(
+      ContainerCommandRequestProto msg, BlockData blockData) {
+
+    WriteChunkResponseProto.Builder writeChunk =
+        WriteChunkResponseProto.newBuilder();
+    if (blockData != null) {
+      writeChunk.setCommittedBlockLength(
+          getCommittedBlockLengthResponseBuilder(
+              blockData.getSize(), blockData.getBlockID()));
+
+    }
+    return getSuccessResponseBuilder(msg)
+        .setCmdType(Type.WriteChunk)
+        .setWriteChunk(writeChunk)
+        .build();
+  }
+
   /**
    * Gets a response to the read small file call.
    * @param request - Msg
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1453ae56b4..702710474a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -406,8 +406,10 @@ public final class ContainerProtocolCalls  {
    */
   public static XceiverClientReply writeChunkAsync(
       XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
-      ByteString data, String tokenString, int replicationIndex)
+      ByteString data, String tokenString,
+      int replicationIndex, BlockData blockData)
       throws IOException, ExecutionException, InterruptedException {
+
     WriteChunkRequestProto.Builder writeChunkRequest =
         WriteChunkRequestProto.newBuilder()
             .setBlockID(DatanodeBlockID.newBuilder()
@@ -418,6 +420,12 @@ public final class ContainerProtocolCalls  {
                 .build())
             .setChunkData(chunk)
             .setData(data);
+    if (blockData != null) {
+      PutBlockRequestProto.Builder createBlockRequest =
+          PutBlockRequestProto.newBuilder()
+              .setBlockData(blockData);
+      writeChunkRequest.setBlock(createBlockRequest);
+    }
     String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
     ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder()
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f4ffc4ef27..9eb5b909cc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -432,11 +432,10 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       if (!blockAlreadyFinalized) {
         // create the log entry proto
         final WriteChunkRequestProto commitWriteChunkProto =
-            WriteChunkRequestProto.newBuilder()
-                .setBlockID(write.getBlockID())
-                .setChunkData(write.getChunkData())
+            WriteChunkRequestProto.newBuilder(write)
                 // skipping the data field as it is
                 // already set in statemachine data proto
+                .clearData()
                 .build();
         ContainerCommandRequestProto commitContainerCommandProto =
             ContainerCommandRequestProto
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 594500b77b..8a2a1bc56e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -112,6 +112,7 @@ import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
+import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getWriteChunkResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
@@ -780,6 +781,7 @@ public class KeyValueHandler extends Handler {
 
       data = chunkManager.readChunk(kvContainer, blockID, chunkInfo,
           dispatcherContext);
+      LOG.debug("read chunk from block {} chunk {}", blockID, chunkInfo);
       // Validate data only if the read chunk is issued by Ratis for its
       // internal logic.
       //  For client reads, the client is expected to validate.
@@ -841,6 +843,7 @@ public class KeyValueHandler extends Handler {
       return malformedRequest(request);
     }
 
+    ContainerProtos.BlockData blockDataProto = null;
     try {
       checkContainerOpen(kvContainer);
 
@@ -864,6 +867,28 @@ public class KeyValueHandler extends Handler {
       chunkManager
           .writeChunk(kvContainer, blockID, chunkInfo, data, 
dispatcherContext);
 
+      final boolean isCommit = dispatcherContext.getStage().isCommit();
+      if (isCommit && writeChunk.hasBlock()) {
+        metrics.incContainerOpsMetrics(Type.PutBlock);
+        BlockData blockData = BlockData.getFromProtoBuf(
+            writeChunk.getBlock().getBlockData());
+        // optimization for hsync when WriteChunk is in commit phase:
+        //
+        // block metadata is piggybacked in the same message.
+        // there will not be an additional PutBlock request.
+        //
+        // End of block will always be sent as a standalone PutBlock.
+        // the PutBlock piggybacked in WriteChunk is never end of block.
+        //
+        // do not do this in WRITE_DATA phase otherwise PutBlock will be out
+        // of order.
+        blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
+        blockManager.putBlock(kvContainer, blockData, false);
+        blockDataProto = blockData.getProtoBufMessage();
+        final long numBytes = blockDataProto.getSerializedSize();
+        metrics.incContainerBytesStats(Type.PutBlock, numBytes);
+      }
+
       // We should increment stats after writeChunk
       if (isWrite) {
         metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
@@ -877,7 +902,7 @@ public class KeyValueHandler extends Handler {
           request);
     }
 
-    return getSuccessResponse(request);
+    return getWriteChunkResponseSuccess(request, blockDataProto);
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 4690565b0b..413f36a761 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -54,7 +54,8 @@ import org.slf4j.LoggerFactory;
  */
 public class BlockManagerImpl implements BlockManager {
 
-  static final Logger LOG = LoggerFactory.getLogger(BlockManagerImpl.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockManagerImpl.class);
 
   private ConfigurationSource config;
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
index 8d55bfad78..26719d7f03 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
@@ -66,7 +66,7 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
 
   private Table<String, Long> finalizeBlocksTableWithIterator;
 
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(AbstractDatanodeStore.class);
   private volatile DBStore store;
   private final AbstractDatanodeDBDefinition dbDef;
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index ccde261de0..f5cac29923 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -436,9 +436,11 @@ message  WriteChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
   optional ChunkInfo chunkData = 2;
   optional bytes data = 3;
+  optional PutBlockRequestProto block = 4;
 }
 
 message  WriteChunkResponseProto {
+  optional GetCommittedBlockLengthResponseProto committedBlockLength = 1;
 }
 
 enum ReadChunkVersion {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 7e5de329d1..0d82f0f8bb 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -129,21 +129,26 @@ public class MockXceiverClientSpi extends 
XceiverClientSpi {
   }
 
   private PutBlockResponseProto putBlock(PutBlockRequestProto putBlock) {
+    return PutBlockResponseProto.newBuilder()
+        .setCommittedBlockLength(
+            doPutBlock(putBlock.getBlockData()))
+        .build();
+  }
+
+  private GetCommittedBlockLengthResponseProto doPutBlock(
+      ContainerProtos.BlockData blockData) {
     long length = 0;
-    for (ChunkInfo chunk : putBlock.getBlockData().getChunksList()) {
+    for (ChunkInfo chunk : blockData.getChunksList()) {
       length += chunk.getLen();
     }
 
-    datanodeStorage.putBlock(putBlock.getBlockData().getBlockID(),
-        putBlock.getBlockData());
+    datanodeStorage.putBlock(blockData.getBlockID(),
+        blockData);
 
-    return PutBlockResponseProto.newBuilder()
-        .setCommittedBlockLength(
-            GetCommittedBlockLengthResponseProto.newBuilder()
-                .setBlockID(putBlock.getBlockData().getBlockID())
+    return GetCommittedBlockLengthResponseProto.newBuilder()
+                .setBlockID(blockData.getBlockID())
                 .setBlockLength(length)
-                .build())
-        .build();
+                .build();
   }
 
   private XceiverClientReply result(
@@ -166,8 +171,15 @@ public class MockXceiverClientSpi extends XceiverClientSpi 
{
     datanodeStorage
         .writeChunk(writeChunk.getBlockID(), writeChunk.getChunkData(),
             writeChunk.getData());
-    return WriteChunkResponseProto.newBuilder()
-        .build();
+
+    WriteChunkResponseProto.Builder builder =
+        WriteChunkResponseProto.newBuilder();
+    if (writeChunk.hasBlock()) {
+      ContainerProtos.BlockData
+          blockData = writeChunk.getBlock().getBlockData();
+      builder.setCommittedBlockLength(doPutBlock(blockData));
+    }
+    return builder.build();
   }
 
   @Override
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 015eaa2916..daa433f68f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -33,27 +33,27 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.StorageType;
-import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -73,6 +73,9 @@ import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
+import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -81,9 +84,6 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
-import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
@@ -156,6 +156,7 @@ public class TestHSync {
     // Reduce KeyDeletingService interval
     CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
     CONF.setBoolean("ozone.client.incremental.chunk.list", true);
+    CONF.setBoolean("ozone.client.stream.putblock.piggybacking", true);
     CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
     ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
         .setBlockSize(BLOCK_SIZE)
@@ -177,11 +178,11 @@ public class TestHSync {
     bucket = TestDataUtil.createVolumeAndBucket(client, layout);
 
     // Enable DEBUG level logging for relevant classes
-    GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
-    GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
-    GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockManagerImpl.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(AbstractDatanodeStore.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG);
   }
 
   @AfterAll
@@ -549,7 +550,8 @@ public class TestHSync {
           break;
         }
         for (int i = 0; i < n; i++) {
-          assertEquals(data[offset + i], buffer[i]);
+          assertEquals(data[offset + i], buffer[i],
+              "expected at offset " + offset + " i=" + i);
         }
         offset += n;
       }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 161bf3c3b9..9c76c0ec0c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -275,7 +275,7 @@ public interface MiniOzoneCluster extends AutoCloseable {
     protected boolean includeRecon = false;
 
     protected int dnInitialVersion = 
DatanodeVersion.FUTURE_VERSION.toProtoValue();
-    protected int dnCurrentVersion = 
DatanodeVersion.FUTURE_VERSION.toProtoValue();
+    protected int dnCurrentVersion = 
DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue();
 
     protected int numOfDatanodes = 3;
     protected boolean  startDataNodes = true;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index e15e1e4d63..ce5432739c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -80,8 +80,13 @@ class TestBlockOutputStream {
 
   static MiniOzoneCluster createCluster() throws IOException,
       InterruptedException, TimeoutException {
-
     OzoneConfiguration conf = new OzoneConfiguration();
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setChecksumType(ChecksumType.NONE);
+    clientConfig.setStreamBufferFlushDelay(false);
+    clientConfig.setEnablePutblockPiggybacking(true);
+    conf.setFromObject(clientConfig);
+
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
     conf.setQuietMode(false);
@@ -397,7 +402,7 @@ class TestBlockOutputStream {
       key.flush();
       assertEquals(writeChunkCount + 2,
           metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + 1,
+      assertEquals(putBlockCount,
           metrics.getContainerOpCountMetrics(PutBlock));
       assertEquals(pendingWriteChunkCount,
           metrics.getPendingContainerOpCountMetrics(WriteChunk));
@@ -426,9 +431,9 @@ class TestBlockOutputStream {
           metrics.getPendingContainerOpCountMetrics(PutBlock));
       assertEquals(writeChunkCount + 2,
           metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + 2,
+      assertEquals(putBlockCount + 1,
           metrics.getContainerOpCountMetrics(PutBlock));
-      assertEquals(totalOpCount + 4, metrics.getTotalOpCount());
+      assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
       assertEquals(0, keyOutputStream.getStreamEntries().size());
 
       validateData(keyName, data1, client.getObjectStore(), VOLUME, BUCKET);
@@ -493,9 +498,9 @@ class TestBlockOutputStream {
           metrics.getPendingContainerOpCountMetrics(PutBlock));
       assertEquals(writeChunkCount + 3,
           metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + 2,
+      assertEquals(putBlockCount + 1,
           metrics.getContainerOpCountMetrics(PutBlock));
-      assertEquals(totalOpCount + 5, metrics.getTotalOpCount());
+      assertEquals(totalOpCount + 4, metrics.getTotalOpCount());
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
       assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -686,9 +691,9 @@ class TestBlockOutputStream {
       assertEquals(writeChunkCount + 5,
           metrics.getContainerOpCountMetrics(WriteChunk));
       // The previous flush did not trigger any action with flushDelay enabled
-      assertEquals(putBlockCount + (flushDelay ? 3 : 4),
+      assertEquals(putBlockCount + (flushDelay ? 2 : 3),
           metrics.getContainerOpCountMetrics(PutBlock));
-      assertEquals(totalOpCount + (flushDelay ? 8 : 9),
+      assertEquals(totalOpCount + (flushDelay ? 7 : 8),
           metrics.getTotalOpCount());
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 2ae69dc3c9..e773bf7ed7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -607,18 +607,40 @@ public class TestOmContainerLocationCache {
 
   private void mockWriteChunkResponse(XceiverClientSpi mockDnProtocol)
       throws IOException, ExecutionException, InterruptedException {
-    ContainerCommandResponseProto writeResponse =
-        ContainerCommandResponseProto.newBuilder()
-            .setWriteChunk(WriteChunkResponseProto.newBuilder().build())
-            .setResult(Result.SUCCESS)
-            .setCmdType(Type.WriteChunk)
-            .build();
     doAnswer(invocation ->
-        new XceiverClientReply(completedFuture(writeResponse)))
+        new XceiverClientReply(
+            completedFuture(
+                createWriteChunkResponse(
+                    (ContainerCommandRequestProto)invocation.getArgument(0)))))
         .when(mockDnProtocol)
         .sendCommandAsync(argThat(matchCmd(Type.WriteChunk)));
   }
 
+  ContainerCommandResponseProto createWriteChunkResponse(
+      ContainerCommandRequestProto request) {
+    ContainerProtos.WriteChunkRequestProto writeChunk = 
request.getWriteChunk();
+
+    WriteChunkResponseProto.Builder builder =
+        WriteChunkResponseProto.newBuilder();
+    if (writeChunk.hasBlock()) {
+      ContainerProtos.BlockData
+          blockData = writeChunk.getBlock().getBlockData();
+
+      GetCommittedBlockLengthResponseProto response =
+          GetCommittedBlockLengthResponseProto.newBuilder()
+          .setBlockID(blockData.getBlockID())
+          .setBlockLength(blockData.getSize())
+          .build();
+
+      builder.setCommittedBlockLength(response);
+    }
+    return ContainerCommandResponseProto.newBuilder()
+        .setWriteChunk(builder.build())
+        .setResult(Result.SUCCESS)
+        .setCmdType(Type.WriteChunk)
+        .build();
+  }
+
   private ArgumentMatcher<ContainerCommandRequestProto> matchCmd(Type type) {
     return argument -> argument != null && argument.getCmdType() == type;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to