This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 305a176b7d HDDS-9751. [hsync] Make Putblock performance acceptable - 
DataNode side (#5662)
305a176b7d is described below

commit 305a176b7d479a3ef40171a21ffc7361454f682e
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Jan 23 10:20:48 2024 -0800

    HDDS-9751. [hsync] Make Putblock performance acceptable - DataNode side 
(#5662)
---
 .../container/keyvalue/impl/BlockManagerImpl.java  |   3 +-
 .../metadata/AbstractDatanodeDBDefinition.java     |   1 +
 .../ozone/container/metadata/DatanodeStore.java    |   3 +-
 .../metadata/DatanodeStoreSchemaThreeImpl.java     |   2 +-
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |   2 +-
 .../DatanodeStoreWithIncrementalChunkList.java     | 226 +++++++++++++++++++++
 .../keyvalue/impl/TestBlockManagerImpl.java        | 182 +++++++++++++++++
 .../hadoop/ozone/client/MockDatanodeStorage.java   | 132 ++++++++++--
 .../hadoop/ozone/client/MockXceiverClientSpi.java  |   3 +-
 9 files changed, 525 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 1977b11531..e40434f508 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -364,6 +364,7 @@ public class BlockManagerImpl implements BlockManager {
 
   private BlockData getBlockByID(DBHandle db, BlockID blockID,
       KeyValueContainerData containerData) throws IOException {
-    return db.getStore().getBlockByID(blockID, containerData);
+    String blockKey = containerData.getBlockKey(blockID.getLocalID());
+    return db.getStore().getBlockByID(blockID, blockKey);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index fec2a3f7d2..b2c62dfcbd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -77,6 +77,7 @@ public abstract class AbstractDatanodeDBDefinition implements 
DBDefinition {
   public DBColumnFamilyDefinition<String, Long> 
getFinalizeBlocksColumnFamily() {
     return null;
   }
+
   public abstract DBColumnFamilyDefinition<String, BlockData>
       getLastChunkInfoColumnFamily();
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
index 4abfb60c4f..35f8bf8322 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
@@ -130,8 +130,7 @@ public interface DatanodeStore extends Closeable {
   }
 
   default BlockData getBlockByID(BlockID blockID,
-      KeyValueContainerData containerData) throws IOException {
-    String blockKey = containerData.getBlockKey(blockID.getLocalID());
+      String blockKey) throws IOException {
 
     // check block data table
     BlockData blockData = getBlockDataTable().get(blockKey);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index 7f37c9ae51..25479a7a9c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -53,7 +53,7 @@ import static 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDe
  * - All keys have containerID as prefix.
  * - The table 3 has String as key instead of Long since we want to use prefix.
  */
-public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore
+public class DatanodeStoreSchemaThreeImpl extends 
DatanodeStoreWithIncrementalChunkList
     implements DeleteTransactionStore<String> {
 
   public static final String DUMP_FILE_SUFFIX = ".data";
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index f09d30e45a..c9ea52b47c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -31,7 +31,7 @@ import java.io.IOException;
  * 2. A metadata table.
  * 3. A Delete Transaction Table.
  */
-public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore
+public class DatanodeStoreSchemaTwoImpl extends 
DatanodeStoreWithIncrementalChunkList
     implements DeleteTransactionStore<Long> {
 
   private final Table<Long, DeletedBlocksTransaction>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
new file mode 100644
index 0000000000..51e4533500
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.metadata;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
+import static 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
+import static 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
+
+/**
+ * Constructs a datanode store in accordance with schema version 2, which uses
+ * three column families/tables:
+ * 1. A block data table.
+ * 2. A metadata table.
+ * 3. A Delete Transaction Table.
+ */
+public class DatanodeStoreWithIncrementalChunkList extends 
AbstractDatanodeStore {
+ /**
+  * Constructs the metadata store and starts the DB services.
+  *
+  * @param config - Ozone Configuration.
+  * @throws IOException - on Failure.
+  */
+  public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config,
+      AbstractDatanodeDBDefinition dbDef, boolean openReadOnly) throws 
IOException {
+    super(config, dbDef, openReadOnly);
+  }
+
+
+  @Override
+  public BlockData getBlockByID(BlockID blockID,
+      String blockKey) throws IOException {
+    BlockData lastChunk = null;
+    // check block data table
+    BlockData blockData = getBlockDataTable().get(blockKey);
+    if (blockData == null || isPartialChunkList(blockData)) {
+      // check last chunk table
+      lastChunk = getLastChunkInfoTable().get(blockKey);
+    }
+
+    if (blockData == null) {
+      if (lastChunk == null) {
+        throw new StorageContainerException(
+            NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks());
+        }
+        return lastChunk;
+      }
+    } else {
+      if (lastChunk != null) {
+        reconcilePartialChunks(lastChunk, blockData);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks());
+        }
+      }
+    }
+
+    return blockData;
+  }
+
+  private void reconcilePartialChunks(
+      BlockData lastChunk, BlockData blockData) {
+    LOG.debug("blockData={}, lastChunk={}",
+        blockData.getChunks(), lastChunk.getChunks());
+    Preconditions.checkState(lastChunk.getChunks().size() == 1);
+    ContainerProtos.ChunkInfo lastChunkInBlockData =
+        blockData.getChunks().get(blockData.getChunks().size() - 1);
+    Preconditions.checkState(
+        lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
+            == lastChunk.getChunks().get(0).getOffset(),
+        "chunk offset does not match");
+
+    // append last partial chunk to the block data
+    List<ContainerProtos.ChunkInfo> chunkInfos =
+        new ArrayList<>(blockData.getChunks());
+    chunkInfos.add(lastChunk.getChunks().get(0));
+    blockData.setChunks(chunkInfos);
+
+    blockData.setBlockCommitSequenceId(
+        lastChunk.getBlockCommitSequenceId());
+  }
+
+  private static boolean isPartialChunkList(BlockData data) {
+    return data.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST);
+  }
+
+  private static boolean isFullChunk(ContainerProtos.ChunkInfo chunkInfo) {
+    for (ContainerProtos.KeyValue kv: chunkInfo.getMetadataList()) {
+      if (kv.getKey().equals(FULL_CHUNK)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // if eob or if the last chunk is full,
+  private static boolean shouldAppendLastChunk(boolean endOfBlock,
+      BlockData data) {
+    if (endOfBlock || data.getChunks().isEmpty()) {
+      return true;
+    }
+    return isFullChunk(data.getChunks().get(data.getChunks().size() - 1));
+  }
+
+  public void putBlockByID(BatchOperation batch, boolean incremental,
+      long localID, BlockData data, KeyValueContainerData containerData,
+      boolean endOfBlock) throws IOException {
+    if (!incremental && !isPartialChunkList(data)) {
+      // Case (1) old client: override chunk list.
+      getBlockDataTable().putWithBatch(
+          batch, containerData.getBlockKey(localID), data);
+    } else if (shouldAppendLastChunk(endOfBlock, data)) {
+      moveLastChunkToBlockData(batch, localID, data, containerData);
+    } else {
+      // incremental chunk list,
+      // not end of block, has partial chunks
+      putBlockWithPartialChunks(batch, localID, data, containerData);
+    }
+  }
+
+  private void moveLastChunkToBlockData(BatchOperation batch, long localID,
+      BlockData data, KeyValueContainerData containerData) throws IOException {
+    // if eob or if the last chunk is full,
+    // the 'data' is full so append it to the block table's chunk info
+    // and then remove from lastChunkInfo
+    BlockData blockData = getBlockDataTable().get(
+        containerData.getBlockKey(localID));
+    if (blockData == null) {
+      // Case 2.1 if the block did not have full chunks before,
+      // the block's chunk is what received from client this time.
+      blockData = data;
+    } else {
+      // case 2.2 the block already has some full chunks
+      List<ContainerProtos.ChunkInfo> chunkInfoList = blockData.getChunks();
+      blockData.setChunks(new ArrayList<>(chunkInfoList));
+      for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
+        blockData.addChunk(chunk);
+      }
+      blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
+    }
+    // delete the entry from last chunk info table
+    getLastChunkInfoTable().deleteWithBatch(
+        batch, containerData.getBlockKey(localID));
+    // update block data table
+    getBlockDataTable().putWithBatch(batch,
+        containerData.getBlockKey(localID), blockData);
+  }
+
+  private void putBlockWithPartialChunks(BatchOperation batch, long localID,
+      BlockData data, KeyValueContainerData containerData) throws IOException {
+    if (data.getChunks().size() == 1) {
+      // Case (3.1) replace/update the last chunk info table
+      getLastChunkInfoTable().putWithBatch(
+          batch, containerData.getBlockKey(localID), data);
+    } else {
+      int lastChunkIndex = data.getChunks().size() - 1;
+      // received more than one chunk this time
+      List<ContainerProtos.ChunkInfo> lastChunkInfo =
+          Collections.singletonList(
+              data.getChunks().get(lastChunkIndex));
+      BlockData blockData = getBlockDataTable().get(
+          containerData.getBlockKey(localID));
+      if (blockData == null) {
+        // Case 3.2: if the block does not exist in the block data table
+        List<ContainerProtos.ChunkInfo> chunkInfos =
+            new ArrayList<>(data.getChunks());
+        chunkInfos.remove(lastChunkIndex);
+        data.setChunks(chunkInfos);
+        blockData = data;
+        LOG.debug("block {} does not have full chunks yet. Adding the " +
+            "chunks to it {}", localID, blockData);
+      } else {
+        // Case 3.3: if the block exists in the block data table,
+        // append chunks till except the last one (supposedly partial)
+        List<ContainerProtos.ChunkInfo> chunkInfos =
+            new ArrayList<>(blockData.getChunks());
+
+        LOG.debug("blockData.getChunks()={}", chunkInfos);
+        LOG.debug("data.getChunks()={}", data.getChunks());
+
+        for (int i = 0; i < lastChunkIndex; i++) {
+          chunkInfos.add(data.getChunks().get(i));
+        }
+        blockData.setChunks(chunkInfos);
+        blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
+      }
+      getBlockDataTable().putWithBatch(batch,
+          containerData.getBlockKey(localID), blockData);
+      // update the last partial chunk
+      data.setChunks(lastChunkInfo);
+      getLastChunkInfoTable().putWithBatch(
+          batch, containerData.getBlockKey(localID), data);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 57ea7234e9..38a01e4690 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -36,8 +36,10 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -45,6 +47,9 @@ import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
+import static 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
+import static 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -222,4 +227,181 @@ public class TestBlockManagerImpl {
     assertNotNull(listBlockData);
     assertEquals(10, listBlockData.size());
   }
+
+  private BlockData createBlockData(long containerID, long blockNo,
+      int chunkID, long offset, long len, long bcsID)
+      throws IOException {
+    blockID1 = new BlockID(containerID, blockNo);
+    blockData = new BlockData(blockID1);
+    List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+    ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+        .getLocalID(), chunkID), offset, len);
+    chunkList1.add(info1.getProtoBufMessage());
+    blockData.setChunks(chunkList1);
+    blockData.setBlockCommitSequenceId(bcsID);
+    blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+    return blockData;
+  }
+
+  private BlockData createBlockDataWithOneFullChunk(long containerID,
+      long blockNo, int chunkID, long offset, long len, long bcsID)
+      throws IOException {
+    blockID1 = new BlockID(containerID, blockNo);
+    blockData = new BlockData(blockID1);
+    List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+    ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+        .getLocalID(), 1), 0, 4 * 1024 * 1024);
+    info1.addMetadata(FULL_CHUNK, "");
+
+    ChunkInfo info2 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+        .getLocalID(), chunkID), offset, len);
+    chunkList1.add(info1.getProtoBufMessage());
+    chunkList1.add(info2.getProtoBufMessage());
+    blockData.setChunks(chunkList1);
+    blockData.setBlockCommitSequenceId(bcsID);
+    blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+    return blockData;
+  }
+
+  private BlockData createBlockDataWithThreeFullChunks(long containerID,
+      long blockNo, long bcsID) throws IOException {
+    blockID1 = new BlockID(containerID, blockNo);
+    blockData = new BlockData(blockID1);
+    List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+    long chunkLimit = 4 * 1024 * 1024;
+    for (int i = 1; i < 4; i++) {
+      ChunkInfo info1 = new ChunkInfo(
+          String.format("%d_chunk_%d", blockID1.getLocalID(), i),
+          chunkLimit * i, chunkLimit);
+      info1.addMetadata(FULL_CHUNK, "");
+      chunkList1.add(info1.getProtoBufMessage());
+    }
+    blockData.setChunks(chunkList1);
+    blockData.setBlockCommitSequenceId(bcsID);
+    blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+    return blockData;
+  }
+
+  @ContainerTestVersionInfo.ContainerTest
+  public void testFlush1(ContainerTestVersionInfo versionInfo)
+      throws Exception {
+    initTest(versionInfo);
+    Assumptions.assumeFalse(
+        isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+    // simulates writing 1024 bytes, hsync,
+    // write another 1024 bytes, hsync
+    // write another 1024 bytes, hsync
+    long containerID = 1;
+    long blockNo = 2;
+    // put 1st chunk
+    blockData1 = createBlockData(containerID, blockNo, 1, 0, 1024,
+        1);
+    blockManager.putBlock(keyValueContainer, blockData1, false);
+    // put 2nd chunk
+    BlockData blockData2 = createBlockData(containerID, blockNo, 1, 0, 2048,
+        2);
+    blockManager.putBlock(keyValueContainer, blockData2, false);
+    assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+    BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+        new BlockID(containerID, blockNo));
+    assertEquals(2048, getBlockData.getSize());
+    assertEquals(2, getBlockData.getBlockCommitSequenceId());
+    List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+    assertEquals(1, chunkInfos.size());
+    assertEquals(2048, chunkInfos.get(0).getLen());
+    assertEquals(0, chunkInfos.get(0).getOffset());
+
+    // put 3rd chunk, end-of-block
+    BlockData blockData3 = createBlockData(containerID, blockNo, 1, 0, 3072,
+        3);
+    blockManager.putBlock(keyValueContainer, blockData3, true);
+    assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+    getBlockData = blockManager.getBlock(keyValueContainer,
+        new BlockID(containerID, blockNo));
+    assertEquals(3072, getBlockData.getSize());
+    assertEquals(3, getBlockData.getBlockCommitSequenceId());
+    chunkInfos = getBlockData.getChunks();
+    assertEquals(1, chunkInfos.size());
+    assertEquals(3072, chunkInfos.get(0).getLen());
+    assertEquals(0, chunkInfos.get(0).getOffset());
+  }
+
+  @ContainerTestVersionInfo.ContainerTest
+  public void testFlush2(ContainerTestVersionInfo versionInfo)
+      throws Exception {
+    initTest(versionInfo);
+    Assumptions.assumeFalse(
+        isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+    // simulates writing a full chunk + 1024 bytes, hsync,
+    // write another 1024 bytes, hsync
+    // write another 1024 bytes, hsync
+    long containerID = 1;
+    long blockNo = 2;
+    long chunkLimit = 4 * 1024 * 1024;
+    // first hsync (a full chunk + 1024 bytes)
+    blockData1 = createBlockDataWithOneFullChunk(containerID,
+        blockNo, 2, chunkLimit, 1024, 1);
+    blockManager.putBlock(keyValueContainer, blockData1, false);
+    // second hsync (1024 bytes)
+    BlockData blockData2 = createBlockData(containerID, blockNo, 2,
+        chunkLimit, 2048, 2);
+    blockManager.putBlock(keyValueContainer, blockData2, false);
+    assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+    // third hsync (1024 bytes)
+    BlockData blockData3 = createBlockData(containerID, blockNo, 2,
+        chunkLimit, 3072, 3);
+    blockManager.putBlock(keyValueContainer, blockData3, false);
+    assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+    // verify that first chunk is full, second chunk is 3072 bytes
+    BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+        new BlockID(containerID, blockNo));
+    assertEquals(3072 + chunkLimit, getBlockData.getSize());
+    assertEquals(3, getBlockData.getBlockCommitSequenceId());
+    List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+    assertEquals(2, chunkInfos.size());
+    assertEquals(chunkLimit, chunkInfos.get(0).getLen());
+    assertEquals(0, chunkInfos.get(0).getOffset());
+    assertEquals(3072, chunkInfos.get(1).getLen());
+    assertEquals(chunkLimit, chunkInfos.get(1).getOffset());
+  }
+
+  @ContainerTestVersionInfo.ContainerTest
+  public void testFlush3(ContainerTestVersionInfo versionInfo)
+      throws Exception {
+    initTest(versionInfo);
+    Assumptions.assumeFalse(
+        isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+    // simulates writing 1024 bytes, hsync,
+    // and then write till 4 chunks are full
+    long containerID = 1;
+    long blockNo = 2;
+    long chunkLimit = 4 * 1024 * 1024;
+    // first hsync (1024 bytes)
+    blockData1 = createBlockDataWithOneFullChunk(containerID, blockNo, 2,
+        chunkLimit, 1024, 1);
+    blockManager.putBlock(keyValueContainer, blockData1, false);
+    // full flush (4 chunks)
+    BlockData blockData2 = createBlockDataWithThreeFullChunks(
+        containerID, blockNo, 2);
+    blockManager.putBlock(keyValueContainer, blockData2, false);
+    assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+    // verify that the four chunks are full
+    BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+        new BlockID(containerID, blockNo));
+    assertEquals(chunkLimit * 4, getBlockData.getSize());
+    assertEquals(2, getBlockData.getBlockCommitSequenceId());
+    List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+    assertEquals(4, chunkInfos.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(chunkLimit, chunkInfos.get(i).getLen());
+      assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset());
+    }
+  }
 }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index ef2f1fa118..393e8cdb31 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client;
 
 import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
@@ -30,18 +31,25 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * State represents persisted data of one specific datanode.
  */
 public class MockDatanodeStorage {
-
-  private final Map<DatanodeBlockID, BlockData> blocks = new HashedMap();
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MockDatanodeStorage.class);
+  public static final String INCREMENTAL_CHUNK_LIST = "incremental";
+  public static final String FULL_CHUNK = "full";
+  public static final ContainerProtos.KeyValue FULL_CHUNK_KV =
+      ContainerProtos.KeyValue.newBuilder().setKey(FULL_CHUNK).build();
+
+  private final Map<BlockID, BlockData> blocks = new HashedMap();
   private final Map<Long, List<DatanodeBlockID>>
       containerBlocks = new HashedMap();
   private final Map<BlockID, String> fullBlockData = new HashMap<>();
 
-  private final Map<String, ChunkInfo> chunks = new HashMap<>();
-
   private final Map<String, ByteString> data = new HashMap<>();
 
   private IOException exception = null;
@@ -50,8 +58,70 @@ public class MockDatanodeStorage {
     this.exception = reason;
   }
 
+  private boolean isIncrementalChunkList(BlockData blockData) {
+    for (ContainerProtos.KeyValue kv : blockData.getMetadataList()) {
+      if (kv.getKey().equals(INCREMENTAL_CHUNK_LIST)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private BlockID toBlockID(DatanodeBlockID datanodeBlockID) {
+    return new BlockID(datanodeBlockID.getContainerID(),
+        datanodeBlockID.getLocalID());
+  }
+
   public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
-    blocks.put(blockID, blockData);
+    if (isIncrementalChunkList(blockData)) {
+      LOG.debug("incremental chunk list");
+      putBlockIncremental(blockID, blockData);
+    } else {
+      LOG.debug("full chunk list");
+      putBlockFull(blockID, blockData);
+    }
+  }
+
+  private boolean isFullChunk(ChunkInfo chunkInfo) {
+    return (chunkInfo.getMetadataList().contains(FULL_CHUNK_KV));
+  }
+
+  public void putBlockIncremental(
+      DatanodeBlockID blockID, BlockData blockData) {
+    BlockID id = toBlockID(blockID);
+    if (blocks.containsKey(id)) {
+      // block already exists. let's append the chunk list to it.
+      BlockData existing = blocks.get(id);
+      if (existing.getChunksCount() == 0) {
+        // empty chunk list. override it.
+        putBlockFull(blockID, blockData);
+      } else {
+        BlockData.Builder blockDataBuilder = pruneLastPartialChunks(existing);
+        blockDataBuilder.addAllChunks(blockData.getChunksList());
+        blocks.put(id, blockDataBuilder.build());
+      }
+      // TODO: verify the chunk list beginning/offset/len is sane
+    } else {
+      // the block does not exist yet, simply add it
+      putBlockFull(blockID, blockData);
+    }
+  }
+
+  private BlockData.Builder pruneLastPartialChunks(BlockData existing) {
+    BlockData.Builder blockDataBuilder = BlockData.newBuilder(existing);
+    int lastChunkIndex = existing.getChunksCount() - 1;
+    // if the last chunk in the existing block is full, append after it.
+    ChunkInfo chunkInfo = existing.getChunks(lastChunkIndex);
+    if (!isFullChunk(chunkInfo)) {
+      // otherwise, remove it and append
+      blockDataBuilder.removeChunks(lastChunkIndex);
+    }
+    return blockDataBuilder;
+  }
+
+  public void putBlockFull(DatanodeBlockID blockID, BlockData blockData) {
+    BlockID id = toBlockID(blockID);
+    blocks.put(id, blockData);
     List<DatanodeBlockID> dnBlocks = containerBlocks
         .getOrDefault(blockID.getContainerID(), new ArrayList<>());
     dnBlocks.add(blockID);
@@ -59,14 +129,24 @@ public class MockDatanodeStorage {
   }
 
   public BlockData getBlock(DatanodeBlockID blockID) {
-    return blocks.get(blockID);
+    BlockID id = toBlockID(blockID);
+    //assert blocks.containsKey(blockID);
+    if (!blocks.containsKey(id)) {
+      StringBuilder sb = new StringBuilder();
+      for (BlockID bid : blocks.keySet()) {
+        sb.append(bid).append("\n");
+      }
+      throw new AssertionError("blockID " + id +
+          " not found in blocks. Available block ID: \n" + sb);
+    }
+    return blocks.get(id);
   }
 
   public List<BlockData> listBlock(long containerID) {
     List<DatanodeBlockID> datanodeBlockIDS = containerBlocks.get(containerID);
     List<BlockData> listBlocksData = new ArrayList<>();
     for (DatanodeBlockID dBlock : datanodeBlockIDS) {
-      listBlocksData.add(blocks.get(dBlock));
+      listBlocksData.add(blocks.get(toBlockID(dBlock)));
     }
     return listBlocksData;
   }
@@ -77,31 +157,39 @@ public class MockDatanodeStorage {
     if (exception != null) {
       throw exception;
     }
-    data.put(createKey(blockID, chunkInfo),
-        ByteString.copyFrom(bytes.toByteArray()));
-    chunks.put(createKey(blockID, chunkInfo), chunkInfo);
+    String blockKey = createKey(blockID);
+    ByteString block;
+    if (data.containsKey(blockKey)) {
+      block = data.get(blockKey);
+      assert block.size() == chunkInfo.getOffset();
+      data.put(blockKey, block.concat(bytes));
+    } else {
+      assert chunkInfo.getOffset() == 0;
+      data.put(blockKey, bytes);
+    }
+
     fullBlockData
         .put(new BlockID(blockID.getContainerID(), blockID.getLocalID()),
-            fullBlockData.getOrDefault(blockID, "")
+            fullBlockData.getOrDefault(toBlockID(blockID), "")
                 .concat(bytes.toStringUtf8()));
   }
 
-  public ChunkInfo readChunkInfo(
-      DatanodeBlockID blockID,
-      ChunkInfo chunkInfo) {
-    return chunks.get(createKey(blockID, chunkInfo));
-  }
-
   public ByteString readChunkData(
       DatanodeBlockID blockID,
       ChunkInfo chunkInfo) {
-    return data.get(createKey(blockID, chunkInfo));
-
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "readChunkData: blockID={}, offset={}, len={}",
+          createKey(blockID), chunkInfo.getOffset(), chunkInfo.getLen());
+    }
+    ByteString str = data.get(createKey(blockID)).substring(
+        (int)chunkInfo.getOffset(),
+        (int)chunkInfo.getOffset() + (int)chunkInfo.getLen());
+    return str;
   }
 
-  private String createKey(DatanodeBlockID blockId, ChunkInfo chunkInfo) {
-    return blockId.getContainerID() + "_" + blockId.getLocalID() + "_"
-        + chunkInfo.getChunkName() + "_" + chunkInfo.getOffset();
+  private String createKey(DatanodeBlockID blockId) {
+    return blockId.getContainerID() + "_" + blockId.getLocalID();
   }
 
   public Map<String, ByteString> getAllBlockData() {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 59eb49e555..7e5de329d1 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -110,8 +110,7 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
 
   private ReadChunkResponseProto readChunk(ReadChunkRequestProto readChunk) {
     return ReadChunkResponseProto.newBuilder()
-        .setChunkData(datanodeStorage
-            .readChunkInfo(readChunk.getBlockID(), readChunk.getChunkData()))
+        .setChunkData(readChunk.getChunkData())
         .setData(datanodeStorage
             .readChunkData(readChunk.getBlockID(), readChunk.getChunkData()))
         .setBlockID(readChunk.getBlockID())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to