ashishkumar50 commented on code in PR #5663:
URL: https://github.com/apache/ozone/pull/5663#discussion_r1473794446


##########
hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+
+/**
+ * Verify BlockOutputStream with incremental PutBlock feature.
+ * (ozone.client.incremental.chunk.list = true)
+ */
+public class TestBlockOutputStreamIncrementalPutBlock {
+  private OzoneClient client;
+  private final String keyName = UUID.randomUUID().toString();
+  private final String volumeName = UUID.randomUUID().toString();
+  private final String bucketName = UUID.randomUUID().toString();
+  private OzoneBucket bucket;
+  private final ConfigurationSource config = new InMemoryConfiguration();
+
+  public static Iterable<Boolean> parameters() {
+    return Arrays.asList(true, false);
+  }
+
+  public void init(boolean incrementalChunkList) throws IOException {
+    OzoneClientConfig clientConfig = config.getObject(OzoneClientConfig.class);
+
+    clientConfig.setIncrementalChunkList(incrementalChunkList);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C);
+
+    ((InMemoryConfiguration)config).setFromObject(clientConfig);
+
+    ((InMemoryConfiguration) config).setBoolean(
+        OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+    ((InMemoryConfiguration) config).setBoolean(
+        OZONE_CHUNK_LIST_INCREMENTAL, true);

Review Comment:
   Do we need to set parameterized incrementalChunkList here instead of just 
true?



##########
hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+
+/**
+ * Verify BlockOutputStream with incremental PutBlock feature.
+ * (ozone.client.incremental.chunk.list = true)
+ */
+public class TestBlockOutputStreamIncrementalPutBlock {
+  private OzoneClient client;
+  private final String keyName = UUID.randomUUID().toString();
+  private final String volumeName = UUID.randomUUID().toString();
+  private final String bucketName = UUID.randomUUID().toString();
+  private OzoneBucket bucket;
+  private final ConfigurationSource config = new InMemoryConfiguration();
+
+  public static Iterable<Boolean> parameters() {
+    return Arrays.asList(true, false);
+  }
+
+  public void init(boolean incrementalChunkList) throws IOException {

Review Comment:
   Can change to Private



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -758,6 +790,156 @@ CompletableFuture<ContainerCommandResponseProto> 
writeChunkToContainer(
     return null;
   }
 
+  /**
+   * Update container block data, which is later sent to DataNodes via 
PutBlock,
+   * using the new chunks sent out via WriteChunk.
+   *
+   * This method is only used when incremental chunk list is enabled.
+   * @param chunk the chunk buffer to be sent out by WriteChunk.
+   * @throws OzoneChecksumException
+   */
+  private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
+      throws OzoneChecksumException {
+    // Update lastChunkBuffer using the new chunk data.
+    // This is used to calculate checksum for the last partial chunk in
+    // containerBlockData which will used by PutBlock.
+
+    // the last partial chunk in containerBlockData will be replaced.
+    // So remove it.
+    removeLastPartialChunk();
+    chunk.rewind();
+    LOG.debug("Adding chunk pos {} limit {} remaining {}." +
+            "lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset = 
{}",
+        chunk.position(), chunk.limit(), chunk.remaining(),
+        lastChunkBuffer.position(), lastChunkBuffer.limit(),
+        lastChunkBuffer.remaining(), lastChunkOffset);
+
+    // Append the chunk to the last chunk buffer.
+    // if the resulting size exceeds limit (4MB),
+    // drop the full chunk and keep the rest.
+    if (lastChunkBuffer.position() + chunk.remaining() <=
+        lastChunkBuffer.capacity()) {
+      appendLastChunkBuffer(chunk, 0, chunk.remaining());
+    } else {
+      int remainingBufferSize =
+          lastChunkBuffer.capacity() - lastChunkBuffer.position();
+      appendLastChunkBuffer(chunk, 0, remainingBufferSize);
+      updateBlockDataWithLastChunkBuffer();
+      appendLastChunkBuffer(chunk, remainingBufferSize,
+          chunk.remaining() - remainingBufferSize);
+    }
+    LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}",
+        lastChunkBuffer, lastChunkOffset);
+
+    updateBlockDataWithLastChunkBuffer();
+  }
+
+  private void updateBlockDataWithLastChunkBuffer()
+      throws OzoneChecksumException {
+    // create chunk info for lastChunkBuffer
+    ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
+    LOG.debug("lastChunkInfo = {}", lastChunkInfo);
+    long lastChunkSize = lastChunkInfo.getLen();
+    addToBlockData(lastChunkInfo);
+
+    lastChunkBuffer.clear();
+    if (lastChunkSize == config.getStreamBufferSize()) {
+      lastChunkOffset += config.getStreamBufferSize();
+    } else {
+      lastChunkBuffer.position((int) lastChunkSize);
+    }
+  }
+
+  private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
+      int length) {
+    LOG.debug("copying to last chunk buffer offset={} length={}",
+        offset, length);
+    int pos = 0;
+    int uncopied = length;
+    for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
+      if (pos + bb.remaining() > offset) {
+        int copyStart = offset < pos ? 0 : offset - pos;
+        int copyLen = Math.min(uncopied, bb.remaining());
+        try {
+          LOG.debug("put into last chunk buffer start = {} len = {}",
+              copyStart, copyLen);
+          lastChunkBuffer.put(bb.array(), copyStart, copyLen);
+        } catch (BufferOverflowException e) {
+          LOG.error("appending from " + copyStart + " for len=" + copyLen +
+              ". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() +
+              " pos=" + lastChunkBuffer.position() +
+              " limit=" + lastChunkBuffer.limit() +
+              " capacity=" + lastChunkBuffer.capacity());
+          throw e;
+        }
+
+        uncopied -= copyLen;
+      }
+
+      pos += bb.remaining();
+      if (pos > offset + length) {

Review Comment:
   Here it should be ">=" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to