This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 76a573a104 HDDS-9752. [hsync] Make Putblock performance acceptable - 
Client side (#5663)
76a573a104 is described below

commit 76a573a104b7f88a117caba1075f10dc33637c86
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Feb 6 20:07:15 2024 -0800

    HDDS-9752. [hsync] Make Putblock performance acceptable - Client side 
(#5663)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  17 ++
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   6 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 184 ++++++++++++++++++++-
 .../DatanodeStoreWithIncrementalChunkList.java     |  29 +++-
 .../keyvalue/impl/TestBlockManagerImpl.java        |   2 +
 .../hadoop/ozone/client/MockOmTransport.java       |  39 ++++-
 .../TestBlockOutputStreamIncrementalPutBlock.java  | 163 ++++++++++++++++++
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 179 +++++++++++++++++---
 8 files changed, 585 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 44af34cb91..d1dcc654b1 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -223,6 +223,15 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";
 
+  @Config(key = "incremental.chunk.list",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      description = "Client PutBlock request can choose incremental chunk " +
+          "list rather than full chunk list to optimize performance. " +
+          "Critical to HBase.",
+      tags = ConfigTag.CLIENT)
+  private boolean incrementalChunkList = false;
+
   @PostConstruct
   private void validate() {
     Preconditions.checkState(streamBufferSize > 0);
@@ -404,4 +413,12 @@ public class OzoneClientConfig {
   public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
     this.datastreamPipelineMode = datastreamPipelineMode;
   }
+
+  public void setIncrementalChunkList(boolean enable) {
+    this.incrementalChunkList = enable;
+  }
+
+  public boolean getIncrementalChunkList() {
+    return this.incrementalChunkList;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a12f9067ce..78cef5f1b4 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -63,7 +63,7 @@ import static 
org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
  */
 public class BlockInputStream extends BlockExtendedInputStream {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(BlockInputStream.class);
 
   private final BlockID blockID;
@@ -256,8 +256,8 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     final Pipeline pipeline = xceiverClient.getPipeline();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Initializing BlockInputStream for get key to access {}",
-          blockID.getContainerID());
+      LOG.debug("Initializing BlockInputStream for get key to access block {}",
+          blockID);
     }
 
     DatanodeBlockID.Builder blkIDBuilder =
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ac21411ea5..1308de4f45 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hdds.scm.storage;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -80,6 +82,12 @@ public class BlockOutputStream extends OutputStream {
       LoggerFactory.getLogger(BlockOutputStream.class);
   public static final String EXCEPTION_MSG =
       "Unexpected Storage Container Exception: ";
+  public static final String INCREMENTAL_CHUNK_LIST = "incremental";
+  public static final KeyValue INCREMENTAL_CHUNK_LIST_KV =
+      KeyValue.newBuilder().setKey(INCREMENTAL_CHUNK_LIST).build();
+  public static final String FULL_CHUNK = "full";
+  public static final KeyValue FULL_CHUNK_KV =
+      KeyValue.newBuilder().setKey(FULL_CHUNK).build();
 
   private AtomicReference<BlockID> blockID;
   private final AtomicReference<ChunkInfo> previousChunkInfo
@@ -123,6 +131,10 @@ public class BlockOutputStream extends OutputStream {
   private int currentBufferRemaining;
   //current buffer allocated to write
   private ChunkBuffer currentBuffer;
+  // last chunk holds the buffer after the last complete chunk, which may be
+  // different from currentBuffer. We need this to calculate checksum.
+  private ByteBuffer lastChunkBuffer;
+  private long lastChunkOffset;
   private final Token<? extends TokenIdentifier> token;
   private final String tokenString;
   private int replicationIndex;
@@ -164,6 +176,13 @@ public class BlockOutputStream extends OutputStream {
     }
     this.containerBlockData = BlockData.newBuilder().setBlockID(
         blkIDBuilder.build()).addMetadata(keyValue);
+    // tell DataNode I will send incremental chunk list
+    if (config.getIncrementalChunkList()) {
+      this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
+      this.lastChunkBuffer =
+          ByteBuffer.allocate(config.getStreamBufferSize());
+      this.lastChunkOffset = 0;
+    }
     this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
     this.bufferPool = bufferPool;
     this.token = token;
@@ -468,6 +487,14 @@ public class BlockOutputStream extends OutputStream {
         ContainerCommandResponseProto> flushFuture = null;
     try {
       BlockData blockData = containerBlockData.build();
+      LOG.debug("sending PutBlock {}", blockData);
+
+      if (config.getIncrementalChunkList()) {
+        // remove any chunks in the containerBlockData list.
+        // since they are sent.
+        containerBlockData.clearChunks();
+      }
+
       XceiverClientReply asyncReply =
           putBlockAsync(xceiverClient, blockData, close, tokenString);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
@@ -746,7 +773,12 @@ public class BlockOutputStream extends OutputStream {
             setIoException(ce);
             throw ce;
           });
-      containerBlockData.addChunks(chunkInfo);
+      if (config.getIncrementalChunkList()) {
+        updateBlockDataForWriteChunk(chunk);
+      } else {
+        containerBlockData.addChunks(chunkInfo);
+      }
+
       clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
       return validateFuture;
     } catch (IOException | ExecutionException e) {
@@ -758,6 +790,156 @@ public class BlockOutputStream extends OutputStream {
     return null;
   }
 
+  /**
+   * Update container block data, which is later sent to DataNodes via 
PutBlock,
+   * using the new chunks sent out via WriteChunk.
+   *
+   * This method is only used when incremental chunk list is enabled.
+   * @param chunk the chunk buffer to be sent out by WriteChunk.
+   * @throws OzoneChecksumException
+   */
+  private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
+      throws OzoneChecksumException {
+    // Update lastChunkBuffer using the new chunk data.
+    // This is used to calculate checksum for the last partial chunk in
+    // containerBlockData which will used by PutBlock.
+
+    // the last partial chunk in containerBlockData will be replaced.
+    // So remove it.
+    removeLastPartialChunk();
+    chunk.rewind();
+    LOG.debug("Adding chunk pos {} limit {} remaining {}." +
+            "lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset = 
{}",
+        chunk.position(), chunk.limit(), chunk.remaining(),
+        lastChunkBuffer.position(), lastChunkBuffer.limit(),
+        lastChunkBuffer.remaining(), lastChunkOffset);
+
+    // Append the chunk to the last chunk buffer.
+    // if the resulting size exceeds limit (4MB),
+    // drop the full chunk and keep the rest.
+    if (lastChunkBuffer.position() + chunk.remaining() <=
+        lastChunkBuffer.capacity()) {
+      appendLastChunkBuffer(chunk, 0, chunk.remaining());
+    } else {
+      int remainingBufferSize =
+          lastChunkBuffer.capacity() - lastChunkBuffer.position();
+      appendLastChunkBuffer(chunk, 0, remainingBufferSize);
+      updateBlockDataWithLastChunkBuffer();
+      appendLastChunkBuffer(chunk, remainingBufferSize,
+          chunk.remaining() - remainingBufferSize);
+    }
+    LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}",
+        lastChunkBuffer, lastChunkOffset);
+
+    updateBlockDataWithLastChunkBuffer();
+  }
+
+  private void updateBlockDataWithLastChunkBuffer()
+      throws OzoneChecksumException {
+    // create chunk info for lastChunkBuffer
+    ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
+    LOG.debug("lastChunkInfo = {}", lastChunkInfo);
+    long lastChunkSize = lastChunkInfo.getLen();
+    addToBlockData(lastChunkInfo);
+
+    lastChunkBuffer.clear();
+    if (lastChunkSize == config.getStreamBufferSize()) {
+      lastChunkOffset += config.getStreamBufferSize();
+    } else {
+      lastChunkBuffer.position((int) lastChunkSize);
+    }
+  }
+
+  private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
+      int length) {
+    LOG.debug("copying to last chunk buffer offset={} length={}",
+        offset, length);
+    int pos = 0;
+    int uncopied = length;
+    for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
+      if (pos + bb.remaining() >= offset) {
+        int copyStart = offset < pos ? 0 : offset - pos;
+        int copyLen = Math.min(uncopied, bb.remaining());
+        try {
+          LOG.debug("put into last chunk buffer start = {} len = {}",
+              copyStart, copyLen);
+          lastChunkBuffer.put(bb.array(), copyStart, copyLen);
+        } catch (BufferOverflowException e) {
+          LOG.error("appending from " + copyStart + " for len=" + copyLen +
+              ". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() +
+              " pos=" + lastChunkBuffer.position() +
+              " limit=" + lastChunkBuffer.limit() +
+              " capacity=" + lastChunkBuffer.capacity());
+          throw e;
+        }
+
+        uncopied -= copyLen;
+      }
+
+      pos += bb.remaining();
+      if (pos >= offset + length) {
+        return;
+      }
+      if (uncopied == 0) {
+        return;
+      }
+    }
+  }
+
+  private void removeLastPartialChunk() {
+    // remove the last chunk if it's partial.
+    if (containerBlockData.getChunksList().isEmpty()) {
+      return;
+    }
+    int lastChunkIndex = containerBlockData.getChunksCount() - 1;
+    ChunkInfo lastChunkInBlockData = containerBlockData.getChunks(
+        lastChunkIndex);
+    if (!isFullChunk(lastChunkInBlockData)) {
+      containerBlockData.removeChunks(lastChunkIndex);
+    }
+  }
+
+  private ChunkInfo createChunkInfo(long lastPartialChunkOffset)
+      throws OzoneChecksumException {
+    lastChunkBuffer.flip();
+    int revisedChunkSize = lastChunkBuffer.remaining();
+    // create the chunk info to be sent in PutBlock.
+    ChecksumData revisedChecksumData =
+        checksum.computeChecksum(lastChunkBuffer);
+
+    long chunkID = lastPartialChunkOffset / config.getStreamBufferSize();
+    ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder()
+        .setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID)
+        .setOffset(lastPartialChunkOffset)
+        .setLen(revisedChunkSize)
+        .setChecksumData(revisedChecksumData.getProtoBufMessage());
+    // if full chunk
+    if (revisedChunkSize == config.getStreamBufferSize()) {
+      revisedChunkInfo.addMetadata(FULL_CHUNK_KV);
+    }
+    return revisedChunkInfo.build();
+  }
+
+  private boolean isFullChunk(ChunkInfo chunkInfo) {
+    Preconditions.checkState(
+        chunkInfo.getLen() <= config.getStreamBufferSize());
+    return chunkInfo.getLen() == config.getStreamBufferSize();
+  }
+
+  private void addToBlockData(ChunkInfo revisedChunkInfo) {
+    LOG.debug("containerBlockData chunk: {}", containerBlockData);
+    if (containerBlockData.getChunksCount() > 0) {
+      ChunkInfo lastChunk = containerBlockData.getChunks(
+          containerBlockData.getChunksCount() - 1);
+      LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo);
+      Preconditions.checkState(lastChunk.getOffset() + lastChunk.getLen() ==
+          revisedChunkInfo.getOffset(),
+            "lastChunk.getOffset() + lastChunk.getLen() " +
+                "!= revisedChunkInfo.getOffset()");
+    }
+    containerBlockData.addChunks(revisedChunkInfo);
+  }
+
   @VisibleForTesting
   public void setXceiverClient(XceiverClientSpi xceiverClient) {
     this.xceiverClient = xceiverClient;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
index 51e4533500..84000ba2fb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
@@ -94,12 +94,16 @@ public class DatanodeStoreWithIncrementalChunkList extends 
AbstractDatanodeStore
     LOG.debug("blockData={}, lastChunk={}",
         blockData.getChunks(), lastChunk.getChunks());
     Preconditions.checkState(lastChunk.getChunks().size() == 1);
-    ContainerProtos.ChunkInfo lastChunkInBlockData =
-        blockData.getChunks().get(blockData.getChunks().size() - 1);
-    Preconditions.checkState(
-        lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
-            == lastChunk.getChunks().get(0).getOffset(),
-        "chunk offset does not match");
+    if (!blockData.getChunks().isEmpty()) {
+      ContainerProtos.ChunkInfo lastChunkInBlockData =
+              blockData.getChunks().get(blockData.getChunks().size() - 1);
+      if (lastChunkInBlockData != null) {
+        Preconditions.checkState(
+            lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
+                == lastChunk.getChunks().get(0).getOffset(),
+            "chunk offset does not match");
+      }
+    }
 
     // append last partial chunk to the block data
     List<ContainerProtos.ChunkInfo> chunkInfos =
@@ -136,7 +140,7 @@ public class DatanodeStoreWithIncrementalChunkList extends 
AbstractDatanodeStore
   public void putBlockByID(BatchOperation batch, boolean incremental,
       long localID, BlockData data, KeyValueContainerData containerData,
       boolean endOfBlock) throws IOException {
-    if (!incremental && !isPartialChunkList(data)) {
+    if (!incremental || !isPartialChunkList(data)) {
       // Case (1) old client: override chunk list.
       getBlockDataTable().putWithBatch(
           batch, containerData.getBlockKey(localID), data);
@@ -151,14 +155,21 @@ public class DatanodeStoreWithIncrementalChunkList 
extends AbstractDatanodeStore
 
   private void moveLastChunkToBlockData(BatchOperation batch, long localID,
       BlockData data, KeyValueContainerData containerData) throws IOException {
+    // if data has no chunks, fetch the last chunk info from lastChunkInfoTable
+    if (data.getChunks().isEmpty()) {
+      BlockData lastChunk = 
getLastChunkInfoTable().get(containerData.getBlockKey(localID));
+      if (lastChunk != null) {
+        reconcilePartialChunks(lastChunk, data);
+      }
+    }
     // if eob or if the last chunk is full,
     // the 'data' is full so append it to the block table's chunk info
     // and then remove from lastChunkInfo
     BlockData blockData = getBlockDataTable().get(
         containerData.getBlockKey(localID));
     if (blockData == null) {
-      // Case 2.1 if the block did not have full chunks before,
-      // the block's chunk is what received from client this time.
+      // Case 2.1 if the block did not have full chunks before
+      // the block's chunk is what received from client this time, plus the 
chunks in lastChunkInfoTable
       blockData = data;
     } else {
       // case 2.2 the block already has some full chunks
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 38a01e4690..26d959e886 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
@@ -83,6 +84,7 @@ public class TestBlockManagerImpl {
     this.schemaVersion = versionInfo.getSchemaVersion();
     this.config = new OzoneConfiguration();
     ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
+    config.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
     initilaze();
   }
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index e4a8a80a63..31f5e20bc8 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -56,6 +56,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Service
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -67,6 +69,8 @@ import java.util.function.Function;
  * OM transport for testing with in-memory state.
  */
 public class MockOmTransport implements OmTransport {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockOmTransport.class);
 
   private final MockBlockAllocator blockAllocator;
   //volumename -> volumeinfo
@@ -185,11 +189,44 @@ public class MockOmTransport implements OmTransport {
         .build();
   }
 
+  private boolean isHSync(CommitKeyRequest commitKeyRequest) {
+    return commitKeyRequest.hasHsync() && commitKeyRequest.getHsync();
+  }
+
+  private boolean isRecovery(CommitKeyRequest commitKeyRequest) {
+    return commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery();
+  }
+
+  private String toOperationString(CommitKeyRequest commitKeyRequest) {
+    boolean hsync = isHSync(commitKeyRequest);
+    boolean recovery = isRecovery(commitKeyRequest);
+    if (hsync) {
+      return "hsync";
+    }
+    if (recovery) {
+      return "recover";
+    }
+    return "commit";
+  }
+
+
   private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) {
     final KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
     final KeyInfo openKey =
         openKeys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName())
-            .remove(keyArgs.getKeyName());
+            .get(keyArgs.getKeyName());
+    LOG.debug("{} open key vol: {} bucket: {} key: {}",
+        toOperationString(commitKeyRequest),
+        keyArgs.getVolumeName(),
+        keyArgs.getBucketName(),
+        keyArgs.getKeyName());
+    boolean hsync = isHSync(commitKeyRequest);
+    if (!hsync) {
+      KeyInfo deleteKey = openKeys.get(keyArgs.getVolumeName())
+          .get(keyArgs.getBucketName())
+          .remove(keyArgs.getKeyName());
+      assert deleteKey != null;
+    }
     final KeyInfo.Builder committedKeyInfoWithLocations =
         KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
             .setBucketName(keyArgs.getBucketName())
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
new file mode 100644
index 0000000000..68549e4104
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+
+/**
+ * Verify BlockOutputStream with incremental PutBlock feature.
+ * (ozone.client.incremental.chunk.list = true)
+ */
+public class TestBlockOutputStreamIncrementalPutBlock {
+  private OzoneClient client;
+  private final String keyName = UUID.randomUUID().toString();
+  private final String volumeName = UUID.randomUUID().toString();
+  private final String bucketName = UUID.randomUUID().toString();
+  private OzoneBucket bucket;
+  private final ConfigurationSource config = new InMemoryConfiguration();
+
+  public static Iterable<Boolean> parameters() {
+    return Arrays.asList(true, false);
+  }
+
+  private void init(boolean incrementalChunkList) throws IOException {
+    OzoneClientConfig clientConfig = config.getObject(OzoneClientConfig.class);
+
+    clientConfig.setIncrementalChunkList(incrementalChunkList);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C);
+
+    ((InMemoryConfiguration)config).setFromObject(clientConfig);
+
+    ((InMemoryConfiguration) config).setBoolean(
+        OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+    ((InMemoryConfiguration) config).setBoolean(
+        OZONE_CHUNK_LIST_INCREMENTAL, incrementalChunkList);
+
+    RpcClient rpcClient = new RpcClient(config, null) {
+
+      @Override
+      protected OmTransport createOmTransport(
+          String omServiceId)
+          throws IOException {
+        return new MockOmTransport();
+      }
+
+      @NotNull
+      @Override
+      protected XceiverClientFactory createXceiverClientFactory(
+          ServiceInfoEx serviceInfo) throws IOException {
+        return new MockXceiverClientFactory();
+      }
+    };
+
+    client = new OzoneClient(config, rpcClient);
+    ObjectStore store = client.getObjectStore();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    bucket = volume.getBucket(bucketName);
+  }
+
+  @AfterEach
+  public void close() throws IOException {
+    client.close();
+  }
+
+  @ParameterizedTest
+  @MethodSource("parameters")
+  public void writeSmallChunk(boolean incrementalChunkList)
+      throws IOException {
+    init(incrementalChunkList);
+
+    int size = 1024;
+    String s = RandomStringUtils.randomAlphabetic(1024);
+    ByteBuffer byteBuffer = 
ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, size,
+        ReplicationConfig.getDefault(config), new HashMap<>())) {
+      for (int i = 0; i < 4097; i++) {
+        out.write(byteBuffer);
+        out.hsync();
+      }
+    }
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      ByteBuffer readBuffer = ByteBuffer.allocate(size);
+      for (int i = 0; i < 4097; i++) {
+        is.read(readBuffer);
+        assertArrayEquals(readBuffer.array(), byteBuffer.array());
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("parameters")
+  public void writeLargeChunk(boolean incrementalChunkList)
+      throws IOException {
+    init(incrementalChunkList);
+
+    int size = 1024 * 1024 + 1;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, size,
+        ReplicationConfig.getDefault(config), new HashMap<>())) {
+      for (int i = 0; i < 4; i++) {
+        out.write(byteBuffer);
+        out.hsync();
+      }
+    }
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      ByteBuffer readBuffer = ByteBuffer.allocate(size);
+      for (int i = 0; i < 4; i++) {
+        is.read(readBuffer);
+        assertArrayEquals(readBuffer.array(), byteBuffer.array());
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 8d7439604e..cd318e88ba 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.ozone;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -28,12 +30,19 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.CipherSuite;
@@ -58,8 +67,10 @@ import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
@@ -82,11 +93,16 @@ import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
@@ -96,11 +112,13 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOU
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -120,12 +138,13 @@ public class TestHSync {
   private static OzoneClient client;
   private static final BucketLayout BUCKET_LAYOUT = 
BucketLayout.FILE_SYSTEM_OPTIMIZED;
 
+  private static final int CHUNK_SIZE = 4 << 12;
+  private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+  private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+  private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+
   @BeforeAll
   public static void init() throws Exception {
-    final int chunkSize = 4 << 10;
-    final int flushSize = 2 * chunkSize;
-    final int maxFlushSize = 2 * flushSize;
-    final int blockSize = 2 * maxFlushSize;
     final BucketLayout layout = BUCKET_LAYOUT;
 
     CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
@@ -133,17 +152,19 @@ public class TestHSync {
     CONF.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     // Reduce KeyDeletingService interval
     CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
+    CONF.setBoolean("ozone.client.incremental.chunk.list", true);
+    CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
     cluster = MiniOzoneCluster.newBuilder(CONF)
         .setNumDatanodes(5)
         .setTotalPipelineNumLimit(10)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setDataStreamBufferFlushize(maxFlushSize)
+        .setBlockSize(BLOCK_SIZE)
+        .setChunkSize(CHUNK_SIZE)
+        .setStreamBufferFlushSize(FLUSH_SIZE)
+        .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+        .setDataStreamBufferFlushize(MAX_FLUSH_SIZE)
         .setStreamBufferSizeUnit(StorageUnit.BYTES)
-        .setDataStreamMinPacketSize(chunkSize)
-        .setDataStreamStreamWindowSize(5 * chunkSize)
+        .setDataStreamMinPacketSize(CHUNK_SIZE)
+        .setDataStreamStreamWindowSize(5 * CHUNK_SIZE)
         .build();
     cluster.waitForClusterToBeReady();
     client = cluster.newClient();
@@ -155,6 +176,8 @@ public class TestHSync {
     GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
   }
 
   @AfterAll
@@ -287,13 +310,15 @@ public class TestHSync {
     }
   }
 
-  @Test
-  public void testO3fsHSync() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
     // Set the fs.defaultFS
     final String rootPath = String.format("%s://%s.%s/",
         OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
     CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
+    initClientConfig(incrementalChunkList);
     try (FileSystem fs = FileSystem.get(CONF)) {
       for (int i = 0; i < 10; i++) {
         final Path file = new Path("/file" + i);
@@ -302,8 +327,10 @@ public class TestHSync {
     }
   }
 
-  @Test
-  public void testOfsHSync() throws Exception {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testOfsHSync(boolean incrementalChunkList) throws Exception {
     // Set the fs.defaultFS
     final String rootPath = String.format("%s://%s/",
         OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
@@ -312,6 +339,7 @@ public class TestHSync {
     final String dir = OZONE_ROOT + bucket.getVolumeName()
         + OZONE_URI_DELIMITER + bucket.getName();
 
+    initClientConfig(incrementalChunkList);
     try (FileSystem fs = FileSystem.get(CONF)) {
       for (int i = 0; i < 10; i++) {
         final Path file = new Path(dir, "file" + i);
@@ -429,13 +457,11 @@ public class TestHSync {
     ThreadLocalRandom.current().nextBytes(data);
 
     final Path file = new Path(dir, "file-hsync-then-close");
-    long blockSize;
     try (FileSystem fs = FileSystem.get(CONF)) {
-      blockSize = fs.getDefaultBlockSize(file);
       long fileSize = 0;
       try (FSDataOutputStream outputStream = fs.create(file, true)) {
         // make sure at least writing 2 blocks data
-        while (fileSize <= blockSize) {
+        while (fileSize <= BLOCK_SIZE) {
           outputStream.write(data, 0, data.length);
           outputStream.hsync();
           fileSize += data.length;
@@ -448,9 +474,9 @@ public class TestHSync {
     omMetrics.resetNumKeyHSyncs();
     long writtenSize = 0;
     try (OzoneOutputStream outputStream = bucket.createKey("key-" + 
RandomStringUtils.randomNumeric(5),
-        blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new 
HashMap<>())) {
+        BLOCK_SIZE * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new 
HashMap<>())) {
       // make sure at least writing 2 blocks data
-      while (writtenSize <= blockSize) {
+      while (writtenSize <= BLOCK_SIZE) {
         outputStream.write(data, 0, data.length);
         outputStream.hsync();
         writtenSize += data.length;
@@ -733,4 +759,117 @@ public class TestHSync {
       assertFalse(cofsos.hasCapability(StreamCapabilities.HFLUSH));
     }
   }
+
+  public void initClientConfig(boolean incrementalChunkList) {
+    OzoneClientConfig clientConfig = CONF.getObject(OzoneClientConfig.class);
+    clientConfig.setIncrementalChunkList(incrementalChunkList);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C);
+    CONF.setFromObject(clientConfig);
+  }
+
+  public static Stream<Arguments> parameters1() {
+    return Stream.of(
+        arguments(true, 512),
+        arguments(true, 511),
+        arguments(true, 513),
+        arguments(false, 512),
+        arguments(false, 511),
+        arguments(false, 513)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("parameters1")
+  public void writeWithSmallBuffer(boolean incrementalChunkList, int 
bufferSize)
+      throws IOException {
+    initClientConfig(incrementalChunkList);
+
+    final String keyName = UUID.randomUUID().toString();
+    int fileSize = 16 << 11;
+    String s = RandomStringUtils.randomAlphabetic(bufferSize);
+    ByteBuffer byteBuffer = 
ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+
+    int writtenSize = 0;
+    try (OzoneOutputStream out = bucket.createKey(keyName, fileSize,
+        ReplicationConfig.getDefault(CONF), new HashMap<>())) {
+      while (writtenSize < fileSize) {
+        int len = Math.min(bufferSize, fileSize - writtenSize);
+        out.write(byteBuffer, 0, len);
+        out.hsync();
+        writtenSize += bufferSize;
+      }
+    }
+
+    OzoneKeyDetails keyInfo = bucket.getKey(keyName);
+    assertEquals(fileSize, keyInfo.getDataSize());
+
+    int readSize = 0;
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      while (readSize < fileSize) {
+        int len = Math.min(bufferSize, fileSize - readSize);
+        ByteBuffer readBuffer = ByteBuffer.allocate(len);
+        int readLen = is.read(readBuffer);
+        assertEquals(len, readLen);
+        if (len < bufferSize) {
+          for (int i = 0; i < len; i++) {
+            assertEquals(readBuffer.array()[i], byteBuffer.array()[i]);
+          }
+        } else {
+          assertArrayEquals(readBuffer.array(), byteBuffer.array());
+        }
+        readSize += readLen;
+      }
+    }
+    bucket.deleteKey(keyName);
+  }
+
+  public static Stream<Arguments> parameters2() {
+    return Stream.of(
+        arguments(true, 1024 * 1024 + 1),
+        arguments(true, 1024 * 1024 + 1 + CHUNK_SIZE),
+        arguments(true, 1024 * 1024 - 1 + CHUNK_SIZE),
+        arguments(false, 1024 * 1024 + 1),
+        arguments(false, 1024 * 1024 + 1 + CHUNK_SIZE),
+        arguments(false, 1024 * 1024 - 1 + CHUNK_SIZE)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("parameters2")
+  public void writeWithBigBuffer(boolean incrementalChunkList, int bufferSize)
+      throws IOException {
+    initClientConfig(incrementalChunkList);
+
+    final String keyName = UUID.randomUUID().toString();
+    int count = 2;
+    int fileSize = bufferSize * count;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, fileSize,
+        ReplicationConfig.getDefault(CONF), new HashMap<>())) {
+      for (int i = 0; i < count; i++) {
+        out.write(byteBuffer);
+        out.hsync();
+      }
+    }
+
+    OzoneKeyDetails keyInfo = bucket.getKey(keyName);
+    assertEquals(fileSize, keyInfo.getDataSize());
+    int totalReadLen = 0;
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+
+      for (int i = 0; i < count; i++) {
+        ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+        int readLen = is.read(readBuffer);
+        if (bufferSize != readLen) {
+          throw new IOException("failed to read " + bufferSize + " from offset 
" + totalReadLen +
+              ", actually read " + readLen + ", block " + totalReadLen /
+              BLOCK_SIZE);
+        }
+        assertArrayEquals(byteBuffer.array(), readBuffer.array());
+        totalReadLen += readLen;
+      }
+    }
+    bucket.deleteKey(keyName);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to