This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 840900f98a HDDS-9915. [hsync] Interface to retrieve block info and 
finalize block in DN through ratis. (#5783)
840900f98a is described below

commit 840900f98aea8def503403df963cf1698f33f261
Author: ashishkumar50 <[email protected]>
AuthorDate: Wed Dec 27 12:32:54 2023 +0530

    HDDS-9915. [hsync] Interface to retrieve block info and finalize block in 
DN through ratis. (#5783)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   1 +
 .../ContainerCommandResponseBuilders.java          |  12 +
 .../ozone/container/ContainerTestHelper.java       |   6 +
 .../org/apache/hadoop/ozone/audit/DNAction.java    |   3 +-
 .../container/common/impl/HddsDispatcher.java      |   7 +
 .../ozone/container/common/interfaces/Handler.java |   4 +
 .../server/ratis/ContainerStateMachine.java        |  41 ++-
 .../container/keyvalue/KeyValueContainer.java      |   7 +
 .../container/keyvalue/KeyValueContainerData.java  |  34 +++
 .../ozone/container/keyvalue/KeyValueHandler.java  |  53 ++++
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  23 ++
 .../container/keyvalue/impl/BlockManagerImpl.java  |  29 ++
 .../keyvalue/impl/ChunkManagerDispatcher.java      |   6 +
 .../keyvalue/impl/FilePerBlockStrategy.java        |  22 ++
 .../keyvalue/interfaces/BlockManager.java          |   3 +
 .../keyvalue/interfaces/ChunkManager.java          |   5 +
 .../metadata/AbstractDatanodeDBDefinition.java     |   4 +
 .../container/metadata/AbstractDatanodeStore.java  | 123 +++++++-
 .../metadata/DatanodeSchemaThreeDBDefinition.java  |  20 +-
 .../metadata/DatanodeSchemaTwoDBDefinition.java    |  17 +-
 .../ozone/container/metadata/DatanodeStore.java    |  10 +
 .../metadata/DatanodeStoreSchemaThreeImpl.java     |   7 +
 .../container/ozoneimpl/ContainerController.java   |  23 ++
 .../container/keyvalue/TestKeyValueHandler.java    |   8 +
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 +
 .../src/main/proto/DatanodeClientProtocol.proto    |  18 +-
 .../commandhandler/TestFinalizeBlock.java          | 336 +++++++++++++++++++++
 27 files changed, 824 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 985b1b80ee..857cbfb6ee 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -448,6 +448,7 @@ public final class HddsUtils {
     case PutSmallFile:
     case StreamInit:
     case StreamWrite:
+    case FinalizeBlock:
     default:
       return false;
     }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 7ef2196c99..e0458f0347 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -318,6 +318,18 @@ public final class ContainerCommandResponseBuilders {
         .build();
   }
 
+  public static ContainerCommandResponseProto getFinalizeBlockResponse(
+      ContainerCommandRequestProto msg, BlockData data) {
+
+    ContainerProtos.FinalizeBlockResponseProto.Builder blockData =
+        ContainerProtos.FinalizeBlockResponseProto.newBuilder()
+        .setBlockData(data);
+
+    return getSuccessResponseBuilder(msg)
+        .setFinalizeBlock(blockData)
+        .build();
+  }
+
   private ContainerCommandResponseBuilders() {
     throw new UnsupportedOperationException("no instances");
   }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index d992fe9f9d..ee742d5f9a 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -624,6 +624,12 @@ public final class ContainerTestHelper {
                   .build())
               .build());
       break;
+    case FinalizeBlock:
+      builder
+          .setFinalizeBlock(ContainerProtos
+            .FinalizeBlockRequestProto.newBuilder()
+            .setBlockID(fakeBlockId).build());
+      break;
 
     default:
       Assert.fail("Unhandled request type " + cmdType + " in unit test");
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 73aff9ac83..d271e7d5d4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -39,7 +39,8 @@ public enum DNAction implements AuditAction {
   GET_SMALL_FILE,
   CLOSE_CONTAINER,
   GET_COMMITTED_BLOCK_LENGTH,
-  STREAM_INIT;
+  STREAM_INIT,
+  FINALIZE_BLOCK;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 1b0ef29c77..4f4cf0a41d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -771,6 +771,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     case CloseContainer   : return DNAction.CLOSE_CONTAINER;
     case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
     case StreamInit       : return DNAction.STREAM_INIT;
+    case FinalizeBlock    : return DNAction.FINALIZE_BLOCK;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
@@ -897,6 +898,12 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
               .toString());
       return auditParams;
 
+    case FinalizeBlock:
+      auditParams.put("blockData",
+          BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID())
+              .toString());
+      return auditParams;
+
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 2ffb9d30d1..bfdff69be4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -213,6 +213,10 @@ public abstract class Handler {
   public abstract void deleteUnreferenced(Container container, long localID)
       throws IOException;
 
+  public abstract void addFinalizedBlock(Container container, long localID);
+
+  public abstract boolean isFinalizedBlockExist(Container container, long 
localID);
+
   public void setClusterID(String clusterID) {
     this.clusterId = clusterID;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 626b548a5a..348ded49f2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -94,6 +94,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.TaskQueue;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.apache.ratis.util.JavaUtils;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -376,8 +377,20 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       ctxt.setException(ioe);
       return ctxt;
     }
-    if (proto.getCmdType() == Type.WriteChunk) {
+    if (proto.getCmdType() == Type.PutBlock) {
+      TransactionContext ctxt = rejectRequest(request,
+          proto.getContainerID(), proto.getPutBlock().getBlockData()
+          .getBlockID().getLocalID());
+      if (ctxt != null) {
+        return ctxt;
+      }
+    } else if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
+      TransactionContext ctxt = rejectRequest(request,
+          proto.getContainerID(), write.getBlockID().getLocalID());
+      if (ctxt != null) {
+        return ctxt;
+      }
       // create the log entry proto
       final WriteChunkRequestProto commitWriteChunkProto =
           WriteChunkRequestProto.newBuilder()
@@ -403,16 +416,32 @@ public class ContainerStateMachine extends 
BaseStateMachine {
           .setStateMachineData(write.getData())
           .setLogData(commitContainerCommandProto.toByteString())
           .build();
-    } else {
-      return TransactionContext.newBuilder()
+    } else if (proto.getCmdType() == Type.FinalizeBlock) {
+      containerController.addFinalizedBlock(proto.getContainerID(),
+          proto.getFinalizeBlock().getBlockID().getLocalID());
+    }
+    return TransactionContext.newBuilder()
+        .setClientRequest(request)
+        .setStateMachine(this)
+        .setServerRole(RaftPeerRole.LEADER)
+        .setStateMachineContext(startTime)
+        .setLogData(proto.toByteString())
+        .build();
+  }
+
+  @Nullable
+  private TransactionContext rejectRequest(RaftClientRequest request,
+              long containerId, long localId) {
+    if (containerController.isFinalizedBlockExist(containerId, localId)) {
+      TransactionContext ctxt = TransactionContext.newBuilder()
           .setClientRequest(request)
           .setStateMachine(this)
           .setServerRole(RaftPeerRole.LEADER)
-          .setStateMachineContext(startTime)
-          .setLogData(proto.toByteString())
           .build();
+      ctxt.setException(new IOException("Block already finalized"));
+      return ctxt;
     }
-
+    return null;
   }
 
   private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) 
{
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 8388182667..98d81c15d0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -76,6 +76,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_COMPACT_DB;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_DB_SYNC;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 import static 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
@@ -433,6 +434,12 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
 
   @Override
   public void close() throws StorageContainerException {
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      containerData.clearFinalizedBlock(db);
+    } catch (IOException ex) {
+      LOG.error("Error in deleting entry from Finalize Block table", ex);
+      throw new StorageContainerException(ex, IO_EXCEPTION);
+    }
     closeAndFlushIfNeeded(containerData::closeContainer);
     LOG.info("Container {} is closed with bcsId {}.",
         containerData.getContainerID(),
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 7fce70f8e1..47d4f3f9e7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -41,6 +41,8 @@ import org.yaml.snakeyaml.nodes.Tag;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.lang.Math.max;
@@ -92,6 +94,8 @@ public class KeyValueContainerData extends ContainerData {
 
   private long blockCommitSequenceId;
 
+  private final Set<Long> finalizedBlockSet;
+
   static {
     // Initialize YAML fields
     KV_YAML_FIELDS = Lists.newArrayList();
@@ -114,6 +118,7 @@ public class KeyValueContainerData extends ContainerData {
         size, originPipelineId, originNodeId);
     this.numPendingDeletionBlocks = new AtomicLong(0);
     this.deleteTransactionId = 0;
+    finalizedBlockSet =  ConcurrentHashMap.newKeySet();
   }
 
   public KeyValueContainerData(KeyValueContainerData source) {
@@ -123,6 +128,7 @@ public class KeyValueContainerData extends ContainerData {
     this.numPendingDeletionBlocks = new AtomicLong(0);
     this.deleteTransactionId = 0;
     this.schemaVersion = source.getSchemaVersion();
+    finalizedBlockSet = ConcurrentHashMap.newKeySet();
   }
 
   /**
@@ -275,6 +281,34 @@ public class KeyValueContainerData extends ContainerData {
     return deleteTransactionId;
   }
 
+  /**
+   * Add the given localID of a block to the finalizedBlockSet.
+   */
+  public void addToFinalizedBlockSet(long localID) {
+    finalizedBlockSet.add(localID);
+  }
+
+  public Set<Long> getFinalizedBlockSet() {
+    return finalizedBlockSet;
+  }
+
+  public boolean isFinalizedBlockExist(long localID) {
+    return finalizedBlockSet.contains(localID);
+  }
+
+  public void clearFinalizedBlock(DBHandle db) throws IOException {
+    if (!finalizedBlockSet.isEmpty()) {
+      // delete from db and clear memory
+      // Should never fail.
+      Preconditions.checkNotNull(db, "DB cannot be null here");
+      try (BatchOperation batch = 
db.getStore().getBatchHandler().initBatchOperation()) {
+        db.getStore().getFinalizeBlocksTable().deleteBatchWithPrefix(batch, 
containerPrefix());
+        db.getStore().getBatchHandler().commitBatchOperation(batch);
+      }
+      finalizedBlockSet.clear();
+    }
+  }
+
   /**
    * Returns a ProtoBuf Message from ContainerData.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 362c08c6a9..8b6ecb43f4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -102,6 +102,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
+import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
@@ -274,6 +275,8 @@ public class KeyValueHandler extends Handler {
       return handler.handleGetSmallFile(request, kvContainer);
     case GetCommittedBlockLength:
       return handler.handleGetCommittedBlockLength(request, kvContainer);
+    case FinalizeBlock:
+      return handler.handleFinalizeBlock(request, kvContainer);
     default:
       return null;
     }
@@ -562,6 +565,46 @@ public class KeyValueHandler extends Handler {
     return putBlockResponseSuccess(request, blockDataProto);
   }
 
+  ContainerCommandResponseProto handleFinalizeBlock(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasFinalizeBlock()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Finalize block request. trace ID: {}",
+            request.getTraceID());
+      }
+      return malformedRequest(request);
+    }
+    ContainerProtos.BlockData responseData;
+
+    try {
+      checkContainerOpen(kvContainer);
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getFinalizeBlock().getBlockID());
+      Preconditions.checkNotNull(blockID);
+
+      LOG.info("Finalized Block request received {} ", blockID);
+
+      responseData = blockManager.getBlock(kvContainer, blockID)
+          .getProtoBufMessage();
+
+      chunkManager.finalizeWriteChunk(kvContainer, blockID);
+      blockManager.finalizeBlock(kvContainer, blockID);
+      kvContainer.getContainerData()
+          .addToFinalizedBlockSet(blockID.getLocalID());
+
+      LOG.info("Block has been finalized {} ", blockID);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException(
+              "Finalize Block failed", ex, IO_EXCEPTION), request);
+    }
+    return getFinalizeBlockResponse(request, responseData);
+  }
+
   /**
    * Handle Get Block operation. Calls BlockManager to process the request.
    */
@@ -1233,6 +1276,16 @@ public class KeyValueHandler extends Handler {
     }
   }
 
+  public void addFinalizedBlock(Container container, long localID) {
+    KeyValueContainer keyValueContainer = (KeyValueContainer)container;
+    keyValueContainer.getContainerData().addToFinalizedBlockSet(localID);
+  }
+
+  public boolean isFinalizedBlockExist(Container container, long localID) {
+    KeyValueContainer keyValueContainer = (KeyValueContainer)container;
+    return keyValueContainer.getContainerData().isFinalizedBlockExist(localID);
+  }
+
   private String[] getFilesWithPrefix(String prefix, File chunkDir) {
     FilenameFilter filter = (dir, name) -> name.startsWith(prefix);
     return chunkDir.list(filter);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index f47d17d738..13b380a89f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -363,6 +363,29 @@ public final class KeyValueContainerUtil {
     // startup. If this method is called but not as a part of startup,
     // The inspectors will be unloaded and this will be a no-op.
     ContainerInspectorUtil.process(kvContainerData, store);
+
+    // Load finalizeBlockLocalIds for container in memory.
+    populateContainerFinalizeBlock(kvContainerData, store);
+  }
+
+  /**
+   * Loads finalizeBlockLocalIds for container in memory.
+   * @param kvContainerData - KeyValueContainerData
+   * @param store - DatanodeStore
+   * @throws IOException
+   */
+  private static void populateContainerFinalizeBlock(
+      KeyValueContainerData kvContainerData, DatanodeStore store)
+      throws IOException {
+    if (store.getFinalizeBlocksTable() != null) {
+      try (BlockIterator<Long> iter =
+               store.getFinalizeBlockIterator(kvContainerData.getContainerID(),
+                   kvContainerData.getUnprefixedKeyFilter())) {
+        while (iter.hasNext()) {
+          kvContainerData.addToFinalizedBlockSet(iter.nextBlock());
+        }
+      }
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 449fe46ae0..38ca691ec1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -41,6 +41,7 @@ import com.google.common.base.Preconditions;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -214,6 +215,34 @@ public class BlockManagerImpl implements BlockManager {
     }
   }
 
+  @Override
+  public void finalizeBlock(Container container, BlockID blockId)
+      throws IOException {
+    Preconditions.checkNotNull(blockId, "blockId cannot " +
+        "be null for finalizeBlock operation.");
+    Preconditions.checkState(blockId.getContainerID() >= 0,
+        "Container Id cannot be negative");
+
+    KeyValueContainer kvContainer = (KeyValueContainer)container;
+    long localID = blockId.getLocalID();
+
+    kvContainer.removeFromPendingPutBlockCache(localID);
+
+    try (DBHandle db = BlockUtils.getDB(kvContainer.getContainerData(),
+        config)) {
+      // Should never fail.
+      Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
+
+      // persist finalizeBlock
+      try (BatchOperation batch = db.getStore().getBatchHandler()
+          .initBatchOperation()) {
+        db.getStore().getFinalizeBlocksTable().putWithBatch(batch,
+            kvContainer.getContainerData().getBlockKey(localID), localID);
+        db.getStore().getBatchHandler().commitBatchOperation(batch);
+      }
+    }
+  }
+
   @Override
   public BlockData getBlock(Container container, BlockID blockID)
       throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 92f6327447..49d54b78c9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -97,6 +97,12 @@ public class ChunkManagerDispatcher implements ChunkManager {
         .finishWriteChunks(kvContainer, blockData);
   }
 
+  @Override
+  public void finalizeWriteChunk(KeyValueContainer kvContainer,
+      BlockID blockId) throws IOException {
+    selectHandler(kvContainer).finalizeWriteChunk(kvContainer, blockId);
+  }
+
   @Override
   public ChunkBuffer readChunk(Container container, BlockID blockID,
       ChunkInfo info, DispatcherContext dispatcherContext)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index ccab7f35e8..cae16e9870 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -221,6 +221,23 @@ public class FilePerBlockStrategy implements ChunkManager {
     }
   }
 
+  @Override
+  public void finalizeWriteChunk(KeyValueContainer container,
+      BlockID blockId) throws IOException {
+    synchronized (container) {
+      File chunkFile = getChunkFile(container, blockId, null);
+      try {
+        if (files.isOpen(chunkFile)) {
+          files.close(chunkFile);
+        }
+        verifyChunkFileExists(chunkFile);
+      } catch (IOException e) {
+        onFailure(container.getContainerData().getVolume());
+        throw e;
+      }
+    }
+  }
+
   private void deleteChunk(Container container, BlockID blockID,
       ChunkInfo info, boolean verifyLength)
       throws StorageContainerException {
@@ -304,6 +321,11 @@ public class FilePerBlockStrategy implements ChunkManager {
       }
     }
 
+    public boolean isOpen(File file) {
+      return file != null &&
+          files.getIfPresent(file.getPath()) != null;
+    }
+
     private static void close(String filename, OpenFile openFile) {
       if (openFile != null) {
         if (LOG.isDebugEnabled()) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index aa7285a232..a815c668d8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -91,6 +91,9 @@ public interface BlockManager {
   long getCommittedBlockLength(Container container, BlockID blockID)
       throws IOException;
 
+  void finalizeBlock(Container container, BlockID blockId)
+      throws IOException;
+
   int getDefaultReadBufferCapacity();
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 151c15f356..ed3142c857 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -106,6 +106,11 @@ public interface ChunkManager {
     // no-op
   }
 
+  default void finalizeWriteChunk(KeyValueContainer container,
+      BlockID blockId) throws IOException {
+    // no-op
+  }
+
   default String streamInit(Container container, BlockID blockID)
       throws StorageContainerException {
     return null;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 2c1c3c214d..e1b10e6df5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -73,4 +73,8 @@ public abstract class AbstractDatanodeDBDefinition implements 
DBDefinition {
 
   public abstract DBColumnFamilyDefinition<String, ChunkInfoList>
       getDeletedBlocksColumnFamily();
+
+  public DBColumnFamilyDefinition<String, Long> 
getFinalizeBlocksColumnFamily() {
+    return null;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
index 50303bd99b..b949a19145 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
@@ -67,6 +67,10 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
 
   private Table<String, ChunkInfoList> deletedBlocksTable;
 
+  private Table<String, Long> finalizeBlocksTable;
+
+  private Table<String, Long> finalizeBlocksTableWithIterator;
+
   static final Logger LOG =
       LoggerFactory.getLogger(AbstractDatanodeStore.class);
   private volatile DBStore store;
@@ -173,6 +177,15 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
       deletedBlocksTable = new DatanodeTable<>(
               dbDef.getDeletedBlocksColumnFamily().getTable(this.store));
       checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName());
+
+      if (dbDef.getFinalizeBlocksColumnFamily() != null) {
+        finalizeBlocksTableWithIterator =
+            dbDef.getFinalizeBlocksColumnFamily().getTable(this.store);
+
+        finalizeBlocksTable = new DatanodeTable<>(
+            finalizeBlocksTableWithIterator);
+        checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName());
+      }
     }
   }
 
@@ -209,18 +222,30 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
     return deletedBlocksTable;
   }
 
+  @Override
+  public Table<String, Long> getFinalizeBlocksTable() {
+    return finalizeBlocksTable;
+  }
+
   @Override
   public BlockIterator<BlockData> getBlockIterator(long containerID)
       throws IOException {
     return new KeyValueBlockIterator(containerID,
-            blockDataTableWithIterator.iterator());
+        blockDataTableWithIterator.iterator());
   }
 
   @Override
   public BlockIterator<BlockData> getBlockIterator(long containerID,
       KeyPrefixFilter filter) throws IOException {
     return new KeyValueBlockIterator(containerID,
-            blockDataTableWithIterator.iterator(), filter);
+        blockDataTableWithIterator.iterator(), filter);
+  }
+
+  @Override
+  public BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+      KeyPrefixFilter filter) throws IOException {
+    return new KeyValueBlockLocalIdIterator(containerID,
+        finalizeBlocksTableWithIterator.iterator(), filter);
   }
 
   @Override
@@ -265,6 +290,10 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
     return this.blockDataTableWithIterator;
   }
 
+  protected Table<String, Long> getFinalizeBlocksTableWithIterator() {
+    return this.finalizeBlocksTableWithIterator;
+  }
+
   private static void checkTableStatus(Table<?, ?> table, String name)
           throws IOException {
     String logMessage = "Unable to get a reference to %s table. Cannot " +
@@ -380,4 +409,94 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
       blockIterator.close();
     }
   }
+
+  /**
+   * Block localId Iterator for KeyValue Container.
+   * This Block localId iterator returns localIds
+   * which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no
+   * filter is specified, then default filter used is
+   * {@link MetadataKeyFilters#getUnprefixedKeyFilter()}
+   */
+  @InterfaceAudience.Public
+  public static class KeyValueBlockLocalIdIterator implements
+      BlockIterator<Long>, Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+        KeyValueBlockLocalIdIterator.class);
+
+    private final TableIterator<String, ? extends Table.KeyValue<String,
+        Long>> blockLocalIdIterator;
+    private final KeyPrefixFilter localIdFilter;
+    private Long nextLocalId;
+    private final long containerID;
+
+    /**
+     * KeyValueBlockLocalIdIterator to iterate block localIds in a container.
+     * @param iterator - The iterator to apply the blockLocalId filter to.
+     * @param filter - BlockLocalId filter to be applied for block localIds.
+     */
+    KeyValueBlockLocalIdIterator(long containerID,
+        TableIterator<String, ? extends Table.KeyValue<String, Long>>
+        iterator, KeyPrefixFilter filter) {
+      this.containerID = containerID;
+      this.blockLocalIdIterator = iterator;
+      this.localIdFilter = filter;
+    }
+
+    /**
+     * This method returns blocks matching with the filter.
+     * @return next block local Id or null if no more block localIds
+     * @throws IOException
+     */
+    @Override
+    public Long nextBlock() throws IOException, NoSuchElementException {
+      if (nextLocalId != null) {
+        Long currentLocalId = nextLocalId;
+        nextLocalId = null;
+        return currentLocalId;
+      }
+      if (hasNext()) {
+        return nextBlock();
+      }
+      throw new NoSuchElementException("Block Local ID Iterator " +
+          "reached end for ContainerID " + containerID);
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      if (nextLocalId != null) {
+        return true;
+      }
+      while (blockLocalIdIterator.hasNext()) {
+        Table.KeyValue<String, Long> keyValue = blockLocalIdIterator.next();
+        byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey());
+        if (localIdFilter.filterKey(null, keyBytes, null)) {
+          nextLocalId = keyValue.getValue();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Block matching with filter found: LocalID is : " +
+                "{} for containerID {}", nextLocalId, containerID);
+          }
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void seekToFirst() {
+      nextLocalId = null;
+      blockLocalIdIterator.seekToFirst();
+    }
+
+    @Override
+    public void seekToLast() {
+      nextLocalId = null;
+      blockLocalIdIterator.seekToLast();
+    }
+
+    @Override
+    public void close() throws IOException {
+      blockLocalIdIterator.close();
+    }
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
index 745e1153da..87a283e458 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
@@ -92,6 +92,15 @@ public class DatanodeSchemaThreeDBDefinition
           DeletedBlocksTransaction.class,
           Proto2Codec.get(DeletedBlocksTransaction.class));
 
+  public static final DBColumnFamilyDefinition<String, Long>
+      FINALIZE_BLOCKS =
+      new DBColumnFamilyDefinition<>(
+          "finalize_blocks",
+          String.class,
+          FixedLengthStringCodec.get(),
+          Long.class,
+          LongCodec.get());
+
   private static String separator = "";
 
   private static final Map<String, DBColumnFamilyDefinition<?, ?>>
@@ -99,7 +108,9 @@ public class DatanodeSchemaThreeDBDefinition
          BLOCK_DATA,
          METADATA,
          DELETED_BLOCKS,
-         DELETE_TRANSACTION);
+         DELETE_TRANSACTION,
+         FINALIZE_BLOCKS);
+
 
   public DatanodeSchemaThreeDBDefinition(String dbPath,
       ConfigurationSource config) {
@@ -122,6 +133,7 @@ public class DatanodeSchemaThreeDBDefinition
     METADATA.setCfOptions(cfOptions);
     DELETED_BLOCKS.setCfOptions(cfOptions);
     DELETE_TRANSACTION.setCfOptions(cfOptions);
+    FINALIZE_BLOCKS.setCfOptions(cfOptions);
   }
 
   @Override
@@ -151,6 +163,12 @@ public class DatanodeSchemaThreeDBDefinition
     return DELETE_TRANSACTION;
   }
 
+  @Override
+  public DBColumnFamilyDefinition<String, Long>
+      getFinalizeBlocksColumnFamily() {
+    return FINALIZE_BLOCKS;
+  }
+
   public static int getContainerKeyPrefixLength() {
     return FixedLengthStringCodec.string2Bytes(
         getContainerKeyPrefix(0L)).length;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 50e181147e..e0e491a9ea 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
 import org.apache.hadoop.hdds.utils.db.LongCodec;
 import org.apache.hadoop.hdds.utils.db.Proto2Codec;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
@@ -76,6 +77,15 @@ public class DatanodeSchemaTwoDBDefinition
           
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
           Proto2Codec.get(DeletedBlocksTransaction.class));
 
+  public static final DBColumnFamilyDefinition<String, Long>
+      FINALIZE_BLOCKS =
+      new DBColumnFamilyDefinition<>(
+          "finalize_blocks",
+          String.class,
+          FixedLengthStringCodec.get(),
+          Long.class,
+          LongCodec.get());
+
   public DatanodeSchemaTwoDBDefinition(String dbPath,
       ConfigurationSource config) {
     super(dbPath, config);
@@ -86,7 +96,8 @@ public class DatanodeSchemaTwoDBDefinition
           BLOCK_DATA,
           METADATA,
           DELETED_BLOCKS,
-          DELETE_TRANSACTION);
+          DELETE_TRANSACTION,
+          FINALIZE_BLOCKS);
 
   @Override
   public Map<String, DBColumnFamilyDefinition<?, ?>> getMap() {
@@ -114,4 +125,8 @@ public class DatanodeSchemaTwoDBDefinition
       getDeleteTransactionsColumnFamily() {
     return DELETE_TRANSACTION;
   }
+
+  public DBColumnFamilyDefinition<String, Long> 
getFinalizeBlocksColumnFamily() {
+    return FINALIZE_BLOCKS;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
index dd2aa5234b..4ca81a0372 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
@@ -77,6 +77,13 @@ public interface DatanodeStore extends Closeable {
    */
   Table<String, ChunkInfoList> getDeletedBlocksTable();
 
+  /**
+   * A Table that keeps finalize blocks requested from client.
+   *
+   * @return Table
+   */
+  Table<String, Long> getFinalizeBlocksTable();
+
   /**
    * Helper to create and write batch transactions.
    */
@@ -94,6 +101,9 @@ public interface DatanodeStore extends Closeable {
   BlockIterator<BlockData> getBlockIterator(long containerID,
       KeyPrefixFilter filter) throws IOException;
 
+  BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+      KeyPrefixFilter filter) throws IOException;
+
   /**
    * Returns if the underlying DB is closed. This call is thread safe.
    * @return true if the DB is closed.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index ee8580defa..7f37c9ae51 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -94,6 +94,13 @@ public class DatanodeStoreSchemaThreeImpl extends 
AbstractDatanodeStore
             .iterator(getContainerKeyPrefix(containerID)), filter);
   }
 
+  @Override
+  public BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+      MetadataKeyFilters.KeyPrefixFilter filter) throws IOException {
+    return new KeyValueBlockLocalIdIterator(containerID,
+        
getFinalizeBlocksTableWithIterator().iterator(getContainerKeyPrefix(containerID)),
 filter);
+  }
+
   public void removeKVContainerData(long containerID) throws IOException {
     String prefix = getContainerKeyPrefix(containerID);
     try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index feb5805387..ecc4e80b4b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -159,6 +159,29 @@ public class ContainerController {
     getHandler(container).closeContainer(container);
   }
 
+  /**
+   * Returns the Container given a container id.
+   *
+   * @param containerId ID of the container
+   * @return Container
+   */
+  public void addFinalizedBlock(final long containerId,
+      final long localId) {
+    Container container = containerSet.getContainer(containerId);
+    if (container != null) {
+      getHandler(container).addFinalizedBlock(container, localId);
+    }
+  }
+
+  public boolean isFinalizedBlockExist(final long containerId,
+      final long localId) {
+    Container container = containerSet.getContainer(containerId);
+    if (container != null) {
+      return getHandler(container).isFinalizedBlockExist(container, localId);
+    }
+    return false;
+  }
+
   public Container importContainer(
       final ContainerData containerData,
       final InputStream rawContainerStream,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index bfaf7b1fca..9e83ed6aa6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -258,6 +258,14 @@ public class TestKeyValueHandler {
         .dispatchRequest(handler, getSmallFileRequest, container, null);
     Mockito.verify(handler, times(1)).handleGetSmallFile(
         any(ContainerCommandRequestProto.class), any());
+
+    // Test Finalize Block Request handling
+    ContainerCommandRequestProto finalizeBlock =
+        getDummyCommandRequestProto(ContainerProtos.Type.FinalizeBlock);
+    KeyValueHandler
+        .dispatchRequest(handler, finalizeBlock, container, null);
+    Mockito.verify(handler, times(1)).handleFinalizeBlock(
+        any(ContainerCommandRequestProto.class), any());
   }
 
   @Test
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 8fd8d08f24..d3e337055a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.UUID;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
 import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.DATANODE_UUID;
@@ -118,6 +119,19 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
     assertEquals(UNKNOWN_BCSID, response.getResult());
   }
 
+  @Test
+  public void testFinalizeBlock() {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleFinalizeBlock(
+            getDummyCommandRequestProto(
+                ContainerProtos.Type.FinalizeBlock),
+            container);
+    assertEquals(CONTAINER_UNHEALTHY, response.getResult());
+  }
+
   @Test
   public void testGetSmallFile() {
     KeyValueContainer container = getMockUnhealthyContainer();
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 718e2a108c..367238a285 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -75,6 +75,8 @@ package hadoop.hdds.datanode;
  *  17. CloseContainer - Closes an open container and makes it immutable.
  *
  *  18. CopyContainer - Copies a container from a remote machine.
+ *
+ *  19. FinalizeBlock - Finalize block request from client.
  */
 
 enum Type {
@@ -103,6 +105,8 @@ enum Type {
 
   StreamInit = 19;
   StreamWrite = 20;
+
+  FinalizeBlock = 21;
 }
 
 
@@ -208,6 +212,8 @@ message ContainerCommandRequestProto {
 
   optional   string encodedToken = 23;
   optional   uint32 version = 24;
+
+  optional   FinalizeBlockRequestProto finalizeBlock = 25;
 }
 
 message ContainerCommandResponseProto {
@@ -237,7 +243,9 @@ message ContainerCommandResponseProto {
   optional   PutSmallFileResponseProto putSmallFile = 19;
   optional   GetSmallFileResponseProto getSmallFile = 20;
 
-  optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
+  optional   GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
+
+  optional   FinalizeBlockResponseProto finalizeBlock = 22;
 }
 
 message ContainerDataProto {
@@ -338,6 +346,14 @@ message  PutBlockResponseProto {
   required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
 }
 
+message  FinalizeBlockRequestProto  {
+  required DatanodeBlockID blockID = 1;
+}
+
+message  FinalizeBlockResponseProto  {
+  required BlockData blockData = 1;
+}
+
 message  GetBlockRequestProto  {
   required DatanodeBlockID blockID = 1;
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
new file mode 100644
index 0000000000..2a0376ba3d
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.JUnit5AwareTimeout;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
+import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK;
+import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests FinalizeBlock.
+ */
+@RunWith(Parameterized.class)
+public class TestFinalizeBlock {
+
+  private OzoneClient client;
+  /**
+    * Set a timeout for each test.
+    */
+  @Rule
+  public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(300));
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf;
+  private ObjectStore objectStore;
+  private static String volumeName = UUID.randomUUID().toString();
+  private static String bucketName = UUID.randomUUID().toString();
+  private boolean schemaV3;
+  private ContainerLayoutVersion layoutVersion;
+
+  public TestFinalizeBlock(boolean enableSchemaV3, ContainerLayoutVersion 
version) {
+    this.schemaV3 = enableSchemaV3;
+    this.layoutVersion = version;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return Arrays.asList(new Object[][]{
+        {false, FILE_PER_CHUNK},
+        {true, FILE_PER_CHUNK},
+        {false, FILE_PER_BLOCK},
+        {true, FILE_PER_BLOCK},
+    });
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+        0, StorageUnit.MB);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3);
+    conf.setEnum(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion);
+
+    DatanodeConfiguration datanodeConfiguration = conf.getObject(
+        DatanodeConfiguration.class);
+    datanodeConfiguration.setBlockDeletionInterval(Duration.ofMillis(100));
+    conf.setFromObject(datanodeConfiguration);
+    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+    scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
+    conf.setFromObject(scmConfig);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
+    cluster.waitForPipelineTobeReady(ONE, 30000);
+
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  @After
+  public void shutdown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        // do nothing.
+      }
+    }
+  }
+
+  @Test
+  public void testFinalizeBlock() throws IOException, InterruptedException, 
TimeoutException {
+    String keyName = UUID.randomUUID().toString();
+    // create key
+    createKey(keyName);
+
+    ContainerID containerId = cluster.getStorageContainerManager()
+        .getContainerManager().getContainers().get(0).containerID();
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
+        .build();
+    List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
+        cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions();
+
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager().getContainer(containerId);
+    Pipeline pipeline = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipeline(container.getPipelineID());
+
+    XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+
+    // Before finalize block WRITE chunk on the same block should pass through
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerTestHelper.getWriteChunkRequest(pipeline, (
+            new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0)
+                .getLocationList().get(0).getLocalID())), 100);
+    xceiverClient.sendCommand(request);
+
+    // Before finalize block PUT block on the same block should pass through
+    request = ContainerTestHelper.getPutBlockRequest(request);
+    xceiverClient.sendCommand(request);
+
+    // Now Finalize Block
+    request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container);
+    ContainerProtos.ContainerCommandResponseProto response =
+        xceiverClient.sendCommand(request);
+
+    Assert.assertTrue(response.getFinalizeBlock()
+        .getBlockData().getBlockID().getLocalID()
+        == omKeyLocationInfoGroupList.get(0)
+        .getLocationList().get(0).getLocalID());
+
+    Assert.assertTrue(((KeyValueContainerData)getContainerfromDN(
+        cluster.getHddsDatanodes().get(0),
+        containerId.getId()).getContainerData())
+        .getFinalizedBlockSet().size() == 1);
+
+    testRejectPutAndWriteChunkAfterFinalizeBlock(containerId, pipeline, 
xceiverClient, omKeyLocationInfoGroupList);
+    testFinalizeBlockReloadAfterDNRestart(containerId);
+    testFinalizeBlockClearAfterCloseContainer(containerId);
+  }
+
+  private void testFinalizeBlockReloadAfterDNRestart(ContainerID containerId) {
+    try {
+      cluster.restartHddsDatanode(0, true);
+    } catch (Exception e) {
+      fail("Fail to restart Datanode");
+    }
+
+    // After restart DN, finalizeBlock should be loaded into memory
+    Assert.assertTrue(((KeyValueContainerData)
+        getContainerfromDN(cluster.getHddsDatanodes().get(0),
+            containerId.getId()).getContainerData())
+        .getFinalizedBlockSet().size() == 1);
+  }
+
+  private void testFinalizeBlockClearAfterCloseContainer(ContainerID 
containerId)
+      throws InterruptedException, TimeoutException {
+    
OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager().getEventQueue(),
+        cluster.getStorageContainerManager());
+
+    // Finalize Block should be cleared from container data.
+    GenericTestUtils.waitFor(() -> (
+        (KeyValueContainerData) 
getContainerfromDN(cluster.getHddsDatanodes().get(0),
+            
containerId.getId()).getContainerData()).getFinalizedBlockSet().size() == 0,
+        100, 10 * 1000);
+    try {
+      // Restart DataNode
+      cluster.restartHddsDatanode(0, true);
+    } catch (Exception e) {
+      fail("Fail to restart Datanode");
+    }
+
+    // After DN restart also there should not be any finalizeBlock
+    Assert.assertTrue(((KeyValueContainerData)getContainerfromDN(
+        cluster.getHddsDatanodes().get(0),
+        containerId.getId()).getContainerData())
+        .getFinalizedBlockSet().size() == 0);
+  }
+
+  private void testRejectPutAndWriteChunkAfterFinalizeBlock(ContainerID 
containerId, Pipeline pipeline,
+      XceiverClientSpi xceiverClient, List<OmKeyLocationInfoGroup> 
omKeyLocationInfoGroupList)
+      throws IOException {
+    // Try doing WRITE chunk on the already finalized block
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerTestHelper.getWriteChunkRequest(pipeline,
+            (new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0)
+                .getLocationList().get(0).getLocalID())), 100);
+
+    try {
+      xceiverClient.sendCommand(request);
+      fail("Write chunk should fail.");
+    } catch (IOException e) {
+      assertTrue(e.getCause().getMessage()
+          .contains("Block already finalized"));
+    }
+
+    // Try doing PUT block on the already finalized block
+    request = ContainerTestHelper.getPutBlockRequest(request);
+    try {
+      xceiverClient.sendCommand(request);
+      fail("Put block should fail.");
+    } catch (IOException e) {
+      assertTrue(e.getCause().getMessage()
+          .contains("Block already finalized"));
+    }
+  }
+
+  @NotNull
+  private ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest(
+      List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList,
+      ContainerInfo container) {
+    final ContainerProtos.ContainerCommandRequestProto.Builder builder =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.FinalizeBlock)
+            .setContainerID(container.getContainerID())
+            .setDatanodeUuid(cluster.getHddsDatanodes()
+            .get(0).getDatanodeDetails().getUuidString());
+
+    final ContainerProtos.DatanodeBlockID blockId =
+        ContainerProtos.DatanodeBlockID.newBuilder()
+            .setContainerID(container.getContainerID()).setLocalID(
+                omKeyLocationInfoGroupList.get(0)
+            .getLocationList().get(0).getLocalID())
+            .setBlockCommitSequenceId(0).build();
+
+    builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto
+        .newBuilder().setBlockID(blockId).build());
+    return builder.build();
+  }
+
+  /**
+   * create a key with specified name.
+   * @param keyName
+   * @throws IOException
+   */
+  private void createKey(String keyName) throws IOException {
+    OzoneOutputStream key = objectStore.getVolume(volumeName)
+        .getBucket(bucketName)
+        .createKey(keyName, 1024, ReplicationType.RATIS,
+            ReplicationFactor.ONE, new HashMap<>());
+    key.write("test".getBytes(UTF_8));
+    key.close();
+  }
+
+  /**
+   * Return the container for the given containerID from the given DN.
+   */
+  private Container getContainerfromDN(HddsDatanodeService hddsDatanodeService,
+      long containerID) {
+    return hddsDatanodeService.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to