This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new be53ed7115 HDDS-12397. Persist putBlock for closed container. (#7943)
be53ed7115 is described below

commit be53ed7115e93b0248a05c3557a7a96be304c978
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Thu Mar 6 14:17:38 2025 -0800

    HDDS-12397. Persist putBlock for closed container. (#7943)
---
 .../ozone/container/keyvalue/KeyValueHandler.java  | 21 ++++--
 .../container/keyvalue/impl/BlockManagerImpl.java  | 74 +++++++++++++++++++++
 .../keyvalue/impl/FilePerBlockStrategy.java        | 26 ++++++--
 .../keyvalue/interfaces/BlockManager.java          | 15 +++++
 .../keyvalue/impl/TestBlockManagerImpl.java        | 75 +++++++++++++++++++++-
 .../keyvalue/impl/TestFilePerBlockStrategy.java    | 63 ++++++++++++++----
 6 files changed, 247 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index a065c4b6d6..d4bfa79142 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1099,11 +1099,17 @@ public void writeChunkForClosedContainer(ChunkInfo 
chunkInfo, BlockID blockID,
 
   /**
    * Handle Put Block operation for closed container. Calls BlockManager to 
process the request.
-   *
+   * This is primarily used by container reconciliation process to persist the 
block data for closed container.
+   * @param kvContainer           - Container for which block data need to be 
persisted.
+   * @param blockData             - Block Data to be persisted (BlockData 
should have the chunks).
+   * @param blockCommitSequenceId - Block Commit Sequence ID for the block.
+   * @param overwriteBscId        - To overwrite bcsId in the block data and 
container. In case of chunk failure
+   *                              during reconciliation, we do not want to 
overwrite the bcsId as this block/container
+   *                              is incomplete in its current state.
    */
-  public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> 
chunkInfos, KeyValueContainer kvContainer,
-                                          BlockData blockData, long 
blockCommitSequenceId)
-      throws IOException {
+  public void putBlockForClosedContainer(KeyValueContainer kvContainer, 
BlockData blockData,
+                                         long blockCommitSequenceId, boolean 
overwriteBscId)
+          throws IOException {
     Preconditions.checkNotNull(kvContainer);
     Preconditions.checkNotNull(blockData);
     long startTime = Time.monotonicNowNanos();
@@ -1112,11 +1118,12 @@ public void 
putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfo
       throw new IOException("Container #" + 
kvContainer.getContainerData().getContainerID() +
           " is not in closed state, Container state is " + 
kvContainer.getContainerState());
     }
-    blockData.setChunks(chunkInfos);
     // To be set from the Replica's BCSId
-    blockData.setBlockCommitSequenceId(blockCommitSequenceId);
+    if (overwriteBscId) {
+      blockData.setBlockCommitSequenceId(blockCommitSequenceId);
+    }
 
-    blockManager.putBlock(kvContainer, blockData, false);
+    blockManager.putBlockForClosedContainer(kvContainer, blockData, 
overwriteBscId);
     ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
     final long numBytes = blockDataProto.getSerializedSize();
     // Increment write stats for PutBlock after write.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 4e0e189973..ba3d404897 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -98,6 +98,80 @@ public long putBlock(Container container, BlockData data,
         data, endOfBlock);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long putBlockForClosedContainer(Container container, BlockData data, 
boolean overwriteBcsId)
+          throws IOException {
+    Preconditions.checkNotNull(data, "BlockData cannot be null for put 
operation.");
+    Preconditions.checkState(data.getContainerID() >= 0, "Container Id cannot 
be negative");
+
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    // We are not locking the key manager since RocksDB serializes all actions
+    // against a single DB. We rely on DB level locking to avoid conflicts.
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.
+      Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
+
+      long blockBcsID = data.getBlockCommitSequenceId();
+      long containerBcsID = containerData.getBlockCommitSequenceId();
+
+      // Check if the block is already present in the DB of the container to 
determine whether
+      // the blockCount is already incremented for this block in the DB or not.
+      long localID = data.getLocalID();
+      boolean incrBlockCount = false;
+
+      // update the blockData as well as BlockCommitSequenceId here
+      try (BatchOperation batch = db.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        // If block already exists in the DB, blockCount should not be 
incremented.
+        if 
(db.getStore().getBlockDataTable().get(containerData.getBlockKey(localID)) == 
null) {
+          incrBlockCount = true;
+        }
+
+        db.getStore().getBlockDataTable().putWithBatch(batch, 
containerData.getBlockKey(localID), data);
+        if (overwriteBcsId && blockBcsID > containerBcsID) {
+          db.getStore().getMetadataTable().putWithBatch(batch, 
containerData.getBcsIdKey(), blockBcsID);
+        }
+
+        // Set Bytes used, this bytes used will be updated for every write and
+        // only get committed for every put block. In this way, when datanode
+        // is up, for computation of disk space by container only committed
+        // block length is used, And also on restart the blocks committed to DB
+        // is only used to compute the bytes used. This is done to keep the
+        // current behavior and avoid DB write during write chunk operation.
+        db.getStore().getMetadataTable().putWithBatch(batch, 
containerData.getBytesUsedKey(),
+            containerData.getBytesUsed());
+
+        // Set Block Count for a container.
+        if (incrBlockCount) {
+          db.getStore().getMetadataTable().putWithBatch(batch, 
containerData.getBlockCountKey(),
+              containerData.getBlockCount() + 1);
+        }
+
+        db.getStore().getBatchHandler().commitBatchOperation(batch);
+      }
+
+      if (overwriteBcsId && blockBcsID > containerBcsID) {
+        container.updateBlockCommitSequenceId(blockBcsID);
+      }
+
+      // Increment block count in-memory after the DB update.
+      if (incrBlockCount) {
+        containerData.incrBlockCount();
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Block {} successfully persisted for closed container {} 
with bcsId {} chunk size {}",
+            data.getBlockID(), containerData.getContainerID(), blockBcsID, 
data.getChunks().size());
+      }
+      return data.getSize();
+    }
+  }
+
   public long persistPutBlock(KeyValueContainer container,
       BlockData data, boolean endOfBlock)
       throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 0621be4aae..db9e36bbd7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.keyvalue.impl;
 
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
 import static 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
@@ -149,7 +150,7 @@ public void writeChunk(Container container, BlockID 
blockID, ChunkInfo info,
         .getContainerData();
 
     final File chunkFile = getChunkFile(container, blockID);
-    long len = info.getLen();
+    long chunkLength = info.getLen();
     long offset = info.getOffset();
 
     HddsVolume volume = containerData.getVolume();
@@ -174,10 +175,27 @@ public void writeChunk(Container container, BlockID 
blockID, ChunkInfo info,
       ChunkUtils.validateChunkSize(channel, info, chunkFile.getName());
     }
 
-    ChunkUtils
-        .writeData(channel, chunkFile.getName(), data, offset, len, volume);
+    long fileLengthBeforeWrite;
+    try {
+      fileLengthBeforeWrite = channel.size();
+    } catch (IOException e) {
+      throw new StorageContainerException("Encountered an error while getting 
the file size for "
+          + chunkFile.getName(), CHUNK_FILE_INCONSISTENCY);
+    }
+
+    ChunkUtils.writeData(channel, chunkFile.getName(), data, offset, 
chunkLength, volume);
+
+    // When overwriting, update the bytes used if the new length is greater 
than the old length
+    // This is to ensure that the bytes used is updated correctly when 
overwriting a smaller chunk
+    // with a larger chunk at the end of the block.
+    if (overwrite) {
+      long fileLengthAfterWrite = offset + chunkLength;
+      if (fileLengthAfterWrite > fileLengthBeforeWrite) {
+        containerData.incrBytesUsed(fileLengthAfterWrite - 
fileLengthBeforeWrite);
+      }
+    }
 
-    containerData.updateWriteStats(len, overwrite);
+    containerData.updateWriteStats(chunkLength, overwrite);
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index 5e20e84135..10562f450b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.ozone.container.keyvalue.interfaces;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 
 /**
  * BlockManager is for performing key related operations on the container.
@@ -49,6 +52,18 @@ public interface BlockManager {
   long putBlock(Container container, BlockData data, boolean endOfBlock)
       throws IOException;
 
+  /**
+   * Persists the block data for a closed container. The block data should 
have all the chunks and bcsId.
+   * Overwrites the block if it already exists, The container's used bytes 
should be updated by the caller with
+   * {@link ChunkManager#writeChunk(Container, BlockID, ChunkInfo, ByteBuffer, 
DispatcherContext)}.
+   *
+   * @param container      - Container for which block data need to be 
persisted.
+   * @param data           - Block Data to be persisted (BlockData should have 
the chunks).
+   * @param overwriteBcsId - To overwrite bcsId of the container.
+   */
+  long putBlockForClosedContainer(Container container, BlockData data, boolean 
overwriteBcsId)
+          throws IOException;
+
   /**
    * Gets an existing block.
    *
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 07414d701a..7bbfa1f7e9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -17,11 +17,13 @@
 
 package org.apache.hadoop.ozone.container.keyvalue.impl;
 
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
 import static 
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -36,10 +38,12 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
@@ -81,10 +85,10 @@ private void initTest(ContainerTestVersionInfo versionInfo)
     this.schemaVersion = versionInfo.getSchemaVersion();
     this.config = new OzoneConfiguration();
     ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
-    initilaze();
+    initialize();
   }
 
-  private void initilaze() throws Exception {
+  private void initialize() throws Exception {
     UUID datanodeId = UUID.randomUUID();
     HddsVolume hddsVolume = new HddsVolume.Builder(folder.toString())
         .conf(config)
@@ -196,6 +200,73 @@ public void testPutAndGetBlock(ContainerTestVersionInfo 
versionInfo)
 
   }
 
+  @ContainerTestVersionInfo.ContainerTest
+  public void testPutBlockForClosed(ContainerTestVersionInfo versionInfo)
+      throws Exception {
+    initTest(versionInfo);
+    KeyValueContainerData containerData = keyValueContainer.getContainerData();
+    assertEquals(0, containerData.getBlockCount());
+    keyValueContainer.close();
+    assertEquals(CLOSED, keyValueContainer.getContainerState());
+
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      // 1. Put Block with bcsId = 1, Overwrite BCS ID = true
+      blockData1 = createBlockData(1L, 2L, 1, 0, 2048, 1);
+      blockManager.putBlockForClosedContainer(keyValueContainer, blockData1, 
true);
+
+      BlockData fromGetBlockData;
+      //Check Container's bcsId
+      fromGetBlockData = blockManager.getBlock(keyValueContainer, 
blockData.getBlockID());
+      assertEquals(1, containerData.getBlockCount());
+      assertEquals(1, containerData.getBlockCommitSequenceId());
+      assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
+      assertEquals(1, 
db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
+      assertEquals(1, 
db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
+
+      // 2. Put Block with bcsId = 2, Overwrite BCS ID = false
+      BlockData blockData2 = createBlockData(1L, 3L, 1, 0, 2048, 2);
+      blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, 
false);
+
+      // The block should be written, but we won't be able to read it, As 
expected BcsId < container's BcsId
+      // fails during block read.
+      assertThrows(StorageContainerException.class, () -> blockManager
+          .getBlock(keyValueContainer, blockData2.getBlockID()));
+      fromGetBlockData = 
db.getStore().getBlockDataTable().get(containerData.getBlockKey(blockData2.getLocalID()));
+      assertEquals(2, containerData.getBlockCount());
+      // BcsId should still be 1, as the BcsId is not overwritten
+      assertEquals(1, containerData.getBlockCommitSequenceId());
+      assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());
+      assertEquals(2, 
db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
+      assertEquals(1, 
db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
+
+      // 3. Put Block with bcsId = 2, Overwrite BCS ID = true
+      // This should succeed as we are overwriting the BcsId, The container 
BcsId should be updated to 2
+      // The block count should not change.
+      blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, 
true);
+      fromGetBlockData = blockManager.getBlock(keyValueContainer, 
blockData2.getBlockID());
+      assertEquals(2, containerData.getBlockCount());
+      assertEquals(2, containerData.getBlockCommitSequenceId());
+      assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());
+      assertEquals(2, 
db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
+      assertEquals(2, 
db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
+
+      // 4. Put Block with bcsId = 1 < container bcsId, Overwrite BCS ID = true
+      // We are overwriting an existing block with lower bcsId than container 
bcsId. Container bcsId should not change
+      BlockData blockData3 = createBlockData(1L, 1L, 1, 0, 2048, 1);
+      blockManager.putBlockForClosedContainer(keyValueContainer, blockData3, 
true);
+      fromGetBlockData = blockManager.getBlock(keyValueContainer, 
blockData3.getBlockID());
+      assertEquals(3, containerData.getBlockCount());
+      assertEquals(2, containerData.getBlockCommitSequenceId());
+      assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
+      assertEquals(3, 
db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
+      assertEquals(2, 
db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
+
+      // writeChunk updates the in-memory state of the used bytes, putBlock 
persists the in-memory state to the DB.
+      // We are only doing putBlock without writeChunk, the used bytes should 
be 0.
+      assertEquals(0, 
db.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
+    }
+  }
+
   @ContainerTestVersionInfo.ContainerTest
   public void testListBlock(ContainerTestVersionInfo versionInfo)
       throws Exception {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
index 7bfce86fb6..16e9968b38 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
@@ -164,8 +164,8 @@ public void 
testWriteChunkAndPutBlockFailureForNonClosedContainer(
     ChunkBuffer.wrap(getData());
     Assertions.assertThrows(IOException.class, () -> 
keyValueHandler.writeChunkForClosedContainer(
         getChunkInfo(), getBlockID(), ChunkBuffer.wrap(getData()), 
keyValueContainer));
-    Assertions.assertThrows(IOException.class, () -> 
keyValueHandler.putBlockForClosedContainer(
-        null, keyValueContainer, new BlockData(getBlockID()), 0L));
+    Assertions.assertThrows(IOException.class, () -> 
keyValueHandler.putBlockForClosedContainer(keyValueContainer,
+            new BlockData(getBlockID()), 0L, true));
   }
 
   @Test
@@ -225,37 +225,72 @@ public void testPutBlockForClosedContainer() throws 
IOException {
     containerSet.addContainer(kvContainer);
     KeyValueHandler keyValueHandler = createKeyValueHandler(containerSet);
     List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
-    chunkInfoList.add(getChunkInfo().getProtoBufMessage());
+    ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", 
getBlockID().getLocalID(), 0), 0L, 20L);
+
+    chunkInfoList.add(info.getProtoBufMessage());
     BlockData putBlockData = new BlockData(getBlockID());
-    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 1L);
-    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
-    Assertions.assertEquals(containerData.getBlockCount(), 1L);
+    putBlockData.setChunks(chunkInfoList);
+
+    ChunkBuffer chunkData = ContainerTestHelper.getData(20);
+    keyValueHandler.writeChunkForClosedContainer(info, getBlockID(), 
chunkData, kvContainer);
+    keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 1L, 
true);
+    assertEquals(1L, containerData.getBlockCommitSequenceId());
+    assertEquals(1L, containerData.getBlockCount());
+    assertEquals(20L, containerData.getBytesUsed());
 
     try (DBHandle dbHandle = BlockUtils.getDB(containerData, new 
OzoneConfiguration())) {
       long localID = putBlockData.getLocalID();
       BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
           .get(containerData.getBlockKey(localID));
       Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+      assertEquals(20L, 
dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
     }
 
     // Add another chunk and check the put block data
-    ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", 
getBlockID()
-        .getLocalID(), 1L), 0, 20L);
+    ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", 
getBlockID().getLocalID(), 1L), 20L, 20L);
+    chunkInfoList.add(newChunkInfo.getProtoBufMessage());
+    putBlockData.setChunks(chunkInfoList);
+
+    chunkData = ContainerTestHelper.getData(20);
+    keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(), 
chunkData, kvContainer);
+    keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, 
true);
+    assertEquals(2L, containerData.getBlockCommitSequenceId());
+    assertEquals(1L, containerData.getBlockCount());
+    assertEquals(40L, containerData.getBytesUsed());
+
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, new 
OzoneConfiguration())) {
+      long localID = putBlockData.getLocalID();
+      BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
+          .get(containerData.getBlockKey(localID));
+      Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+      assertEquals(40L, 
dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
+    }
+
+    // Replace the last chunk with a chunk of greater size, This should only 
update the bytesUsed with
+    // difference in length between the old last chunk and new last chunk
+    newChunkInfo = new ChunkInfo(String.format("%d.data.%d", 
getBlockID().getLocalID(), 1L), 20L, 30L);
+    chunkInfoList.remove(chunkInfoList.size() - 1);
     chunkInfoList.add(newChunkInfo.getProtoBufMessage());
-    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 2L);
-    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
-    Assertions.assertEquals(containerData.getBlockCount(), 1L);
+    putBlockData.setChunks(chunkInfoList);
+
+    chunkData = ContainerTestHelper.getData(30);
+    keyValueHandler.writeChunkForClosedContainer(newChunkInfo, getBlockID(), 
chunkData, kvContainer);
+    keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, 
true);
+    assertEquals(2L, containerData.getBlockCommitSequenceId());
+    assertEquals(1L, containerData.getBlockCount());
+    // Old chunk size 20, new chunk size 30, difference 10. So bytesUsed 
should be 40 + 10 = 50
+    assertEquals(50L, containerData.getBytesUsed());
 
     try (DBHandle dbHandle = BlockUtils.getDB(containerData, new 
OzoneConfiguration())) {
       long localID = putBlockData.getLocalID();
       BlockData getBlockData = dbHandle.getStore().getBlockDataTable()
           .get(containerData.getBlockKey(localID));
       Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
+      assertEquals(50L, 
dbHandle.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
     }
 
-    // Put block on bcsId <= containerBcsId should be a no-op
-    keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, 
putBlockData, 2L);
-    Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
+    keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, 
true);
+    assertEquals(2L, containerData.getBlockCommitSequenceId());
   }
 
   private boolean blockDataEquals(BlockData putBlockData, BlockData 
getBlockData) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to