This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new b328608  HDDS-3205. Handle BlockDeletingService in SCM HA (#1780)
b328608 is described below

commit b328608d2cd4de0bfabcc946c0b56025d1cbde25
Author: runzhiwang <[email protected]>
AuthorDate: Fri Jan 29 00:22:14 2021 +0800

    HDDS-3205. Handle BlockDeletingService in SCM HA (#1780)
---
 .../src/main/proto/SCMRatisProtocol.proto          |   1 +
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   7 +-
 .../hadoop/hdds/scm/block/DeletedBlockLog.java     |  11 -
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |  21 --
 ...lockLogImpl.java => DeletedBlockLogImplV2.java} | 177 ++++++--------
 .../scm/block/DeletedBlockLogStateManager.java     |  45 ++++
 .../scm/block/DeletedBlockLogStateManagerImpl.java | 265 +++++++++++++++++++++
 .../hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java |  20 +-
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   2 +-
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  10 +
 .../hdds/scm/server/StorageContainerManager.java   |   4 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |  78 +++---
 .../hadoop/ozone/TestStorageContainerManager.java  |  43 +++-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  12 +-
 14 files changed, 515 insertions(+), 181 deletions(-)

diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8818a83..c0938e8 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -23,6 +23,7 @@ option java_generate_equals_and_hash = true;
 enum RequestType {
     PIPELINE = 1;
     CONTAINER = 2;
+    BLOCK = 3;
 }
 
 message Method {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index fb5d5d5..8acf985 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -101,8 +101,11 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
 
     // SCM block deleting transaction log and deleting service.
-    deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
-        scm.getScmMetadataStore());
+    deletedBlockLog = new DeletedBlockLogImplV2(conf, 
scm.getContainerManager(),
+        scm.getScmHAManager().getRatisServer(),
+        scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+        scm.getScmHAManager().getDBTransactionBuffer(),
+        scm.getScmContext());
     Duration svcInterval = conf.getObject(
             ScmConfig.class).getBlockDeletionInterval();
     long serviceTimeout =
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 9a5d74f..ddcd2d1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -81,17 +81,6 @@ public interface DeletedBlockLog extends Closeable {
       UUID dnID);
 
   /**
-   * Creates a block deletion transaction and adds that into the log.
-   *
-   * @param containerID - container ID.
-   * @param blocks - blocks that belong to the same container.
-   *
-   * @throws IOException
-   */
-  void addTransaction(long containerID, List<Long> blocks)
-      throws IOException;
-
-  /**
    * Creates block deletion transactions for a set of containers,
    * add into the log and persist them atomically. An object key
    * might be stored in multiple containers and multiple blocks,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 8a46b66..b61f135 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -260,27 +260,6 @@ public class DeletedBlockLogImpl
     return false;
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @param containerID - container ID.
-   * @param blocks      - blocks that belong to the same container.
-   * @throws IOException
-   */
-  @Override
-  public void addTransaction(long containerID, List<Long> blocks)
-      throws IOException {
-    lock.lock();
-    try {
-      Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
-      DeletedBlocksTransaction tx =
-          constructNewTransaction(nextTXID, containerID, blocks);
-      scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
-    } finally {
-      lock.unlock();
-    }
-  }
-
   @Override
   public int getNumOfValidTransactions() throws IOException {
     lock.lock();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
similarity index 74%
copy from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
copy to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
index 8a46b66..179d228 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
@@ -41,10 +41,12 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.UniqueId;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 
@@ -52,6 +54,7 @@ import com.google.common.collect.Lists;
 import static java.lang.Math.min;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +68,7 @@ import org.slf4j.LoggerFactory;
  * equally same chance to be retrieved which only depends on the nature
  * order of the transaction ID.
  */
-public class DeletedBlockLogImpl
+public class DeletedBlockLogImplV2
     implements DeletedBlockLog, EventHandler<DeleteBlockStatus> {
 
   public static final Logger LOG =
@@ -73,18 +76,23 @@ public class DeletedBlockLogImpl
 
   private final int maxRetry;
   private final ContainerManagerV2 containerManager;
-  private final SCMMetadataStore scmMetadataStore;
   private final Lock lock;
   // Maps txId to set of DNs which are successful in committing the transaction
   private Map<Long, Set<UUID>> transactionToDNsCommitMap;
+  // The access to DeletedBlocksTXTable is protected by
+  // DeletedBlockLogStateManager.
+  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private final SCMContext scmContext;
 
-  public DeletedBlockLogImpl(ConfigurationSource conf,
-                             ContainerManagerV2 containerManager,
-                             SCMMetadataStore scmMetadataStore) {
+  public DeletedBlockLogImplV2(ConfigurationSource conf,
+      ContainerManagerV2 containerManager,
+      SCMRatisServer ratisServer,
+      Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable,
+      DBTransactionBuffer dbTxBuffer,
+      SCMContext scmContext) {
     maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
         OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
     this.containerManager = containerManager;
-    this.scmMetadataStore = scmMetadataStore;
     this.lock = new ReentrantLock();
 
     // transactionToDNsCommitMap is updated only when
@@ -92,6 +100,14 @@ public class DeletedBlockLogImpl
 
     // maps transaction to dns which have committed it.
     transactionToDNsCommitMap = new ConcurrentHashMap<>();
+    this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl
+        .newBuilder()
+        .setConfiguration(conf)
+        .setDeletedBlocksTable(deletedBlocksTXTable)
+        .setRatisServer(ratisServer)
+        .setSCMDBTransactionBuffer(dbTxBuffer)
+        .build();
+    this.scmContext = scmContext;
   }
 
 
@@ -103,7 +119,7 @@ public class DeletedBlockLogImpl
       final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
       try (TableIterator<Long,
           ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
-               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+               deletedBlockLogStateManager.getReadOnlyIterator()) {
         while (iter.hasNext()) {
           DeletedBlocksTransaction delTX = iter.next().getValue();
           if (delTX.getCount() == -1) {
@@ -125,47 +141,18 @@ public class DeletedBlockLogImpl
    */
   @Override
   public void incrementCount(List<Long> txIDs) throws IOException {
-    for (Long txID : txIDs) {
-      lock.lock();
-      try {
-        DeletedBlocksTransaction block =
-            scmMetadataStore.getDeletedBlocksTXTable().get(txID);
-        if (block == null) {
-          if (LOG.isDebugEnabled()) {
-            // This can occur due to race condition between retry and old
-            // service task where old task removes the transaction and the new
-            // task is resending
-            LOG.debug("Deleted TXID {} not found.", txID);
-          }
-          continue;
-        }
-        DeletedBlocksTransaction.Builder builder = block.toBuilder();
-        int currentCount = block.getCount();
-        if (currentCount > -1) {
-          builder.setCount(++currentCount);
-        }
-        // if the retry time exceeds the maxRetry value
-        // then set the retry value to -1, stop retrying, admins can
-        // analyze those blocks and purge them manually by SCMCli.
-        if (currentCount > maxRetry) {
-          builder.setCount(-1);
-        }
-        scmMetadataStore.getDeletedBlocksTXTable().put(txID,
-            builder.build());
-      } catch (IOException ex) {
-        LOG.warn("Cannot increase count for txID " + txID, ex);
-        // We do not throw error here, since we don't want to abort the loop.
-        // Just log and continue processing the rest of txids.
-      } finally {
-        lock.unlock();
-      }
+    lock.lock();
+    try {
+      deletedBlockLogStateManager
+          .increaseRetryCountOfTransactionInDB(new ArrayList<>(txIDs));
+    } finally {
+      lock.unlock();
     }
   }
 
 
-  private DeletedBlocksTransaction constructNewTransaction(long txID,
-                                                           long containerID,
-                                                           List<Long> blocks) {
+  private DeletedBlocksTransaction constructNewTransaction(
+      long txID, long containerID, List<Long> blocks) {
     return DeletedBlocksTransaction.newBuilder()
         .setTxID(txID)
         .setContainerID(containerID)
@@ -187,6 +174,7 @@ public class DeletedBlockLogImpl
       List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
     lock.lock();
     try {
+      ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
       Set<UUID> dnsWithCommittedTxn;
       for (DeleteBlockTransactionResult transactionResult :
           transactionResults) {
@@ -229,7 +217,7 @@ public class DeletedBlockLogImpl
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Purging txId={} from block deletion log", txID);
               }
-              scmMetadataStore.getDeletedBlocksTXTable().delete(txID);
+              txIDsToBeDeleted.add(txID);
             }
           }
           if (LOG.isDebugEnabled()) {
@@ -241,6 +229,12 @@ public class DeletedBlockLogImpl
               transactionResult.getTxID(), e);
         }
       }
+      try {
+        deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
+      } catch (IOException e) {
+        LOG.warn("Could not commit delete block transactions: "
+            + txIDsToBeDeleted, e);
+      }
     } finally {
       lock.unlock();
     }
@@ -260,27 +254,6 @@ public class DeletedBlockLogImpl
     return false;
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @param containerID - container ID.
-   * @param blocks      - blocks that belong to the same container.
-   * @throws IOException
-   */
-  @Override
-  public void addTransaction(long containerID, List<Long> blocks)
-      throws IOException {
-    lock.lock();
-    try {
-      Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
-      DeletedBlocksTransaction tx =
-          constructNewTransaction(nextTXID, containerID, blocks);
-      scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
-    } finally {
-      lock.unlock();
-    }
-  }
-
   @Override
   public int getNumOfValidTransactions() throws IOException {
     lock.lock();
@@ -288,7 +261,7 @@ public class DeletedBlockLogImpl
       final AtomicInteger num = new AtomicInteger(0);
       try (TableIterator<Long,
           ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
-               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+               deletedBlockLogStateManager.getReadOnlyIterator()) {
         while (iter.hasNext()) {
           DeletedBlocksTransaction delTX = iter.next().getValue();
           if (delTX.getCount() > -1) {
@@ -303,6 +276,22 @@ public class DeletedBlockLogImpl
   }
 
   /**
+   * Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
+   *  leader.
+   */
+  public void clearTransactionToDNsCommitMap() {
+    transactionToDNsCommitMap.clear();
+  }
+
+  /**
+   * Called in SCMDBTransactionBuffer#flush when the cached deleting operations
+   * are flushed.
+   */
+  public void onFlush() {
+    deletedBlockLogStateManager.onFlush();
+  }
+
+  /**
    * {@inheritDoc}
    *
    * @param containerBlocksMap a map of containerBlocks.
@@ -313,18 +302,17 @@ public class DeletedBlockLogImpl
       throws IOException {
     lock.lock();
     try {
-      try(BatchOperation batch =
-          scmMetadataStore.getStore().initBatchOperation()) {
-        for (Map.Entry< Long, List< Long > > entry :
-            containerBlocksMap.entrySet()) {
-          long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
-          DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
-              entry.getKey(), entry.getValue());
-          scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
-              nextTXID, tx);
-        }
-        scmMetadataStore.getStore().commitBatchOperation(batch);
+      ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
+      for (Map.Entry< Long, List< Long > > entry :
+          containerBlocksMap.entrySet()) {
+        // TODO(runzhiwang): Should use distributed sequence id generator
+        long nextTXID = UniqueId.next();
+        DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
+            entry.getKey(), entry.getValue());
+        txsToBeAdded.add(tx);
       }
+
+      deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
     } finally {
       lock.unlock();
     }
@@ -364,10 +352,9 @@ public class DeletedBlockLogImpl
           new DatanodeDeletedBlockTransactions();
       try (TableIterator<Long,
           ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
-               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+               deletedBlockLogStateManager.getReadOnlyIterator()) {
         int numBlocksAdded = 0;
-        List<DeletedBlocksTransaction> txnsToBePurged =
-            new ArrayList<>();
+        ArrayList<Long> txIDs = new ArrayList<>();
         while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
           Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = 
iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
@@ -383,10 +370,11 @@ public class DeletedBlockLogImpl
           } catch (ContainerNotFoundException ex) {
             LOG.warn("Container: " + id + " was not found for the transaction: 
"
                 + txn);
-            txnsToBePurged.add(txn);
+            txIDs.add(txn.getTxID());
           }
         }
-        purgeTransactions(txnsToBePurged);
+
+        deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
       }
       return transactions;
     } finally {
@@ -394,21 +382,14 @@ public class DeletedBlockLogImpl
     }
   }
 
-  public void purgeTransactions(List<DeletedBlocksTransaction> txnsToBePurged)
-      throws IOException {
-    try (BatchOperation batch = scmMetadataStore.getBatchHandler()
-        .initBatchOperation()) {
-      for (int i = 0; i < txnsToBePurged.size(); i++) {
-        scmMetadataStore.getDeletedBlocksTXTable()
-            .deleteWithBatch(batch, txnsToBePurged.get(i).getTxID());
-      }
-      scmMetadataStore.getBatchHandler().commitBatchOperation(batch);
+  @Override
+  public void onMessage(
+      DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
+    if (!scmContext.isLeader()) {
+      LOG.warn("Skip commit transactions since current SCM is not leader.");
+      return;
     }
-  }
 
-  @Override
-  public void onMessage(DeleteBlockStatus deleteBlockStatus,
-                        EventPublisher publisher) {
     ContainerBlocksDeletionACKProto ackProto =
         deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
     commitTransactions(ackProto.getResultsList(),
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
new file mode 100644
index 0000000..f152a11
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public interface DeletedBlockLogStateManager {
+  @Replicate
+  void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
+      throws IOException;
+
+  @Replicate
+  void removeTransactionsFromDB(ArrayList<Long> txIDs)
+      throws IOException;
+
+  @Replicate
+  void increaseRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
+      throws IOException;
+
+  TableIterator<Long,
+      KeyValue<Long, DeletedBlocksTransaction>> getReadOnlyIterator();
+
+  void onFlush();
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
new file mode 100644
index 0000000..ab72c62
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+
+public class DeletedBlockLogStateManagerImpl
+    implements DeletedBlockLogStateManager {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class);
+
+  private final Table<Long, DeletedBlocksTransaction> deletedTable;
+  private final DBTransactionBuffer transactionBuffer;
+  private final int maxRetry;
+  private Set<Long> deletingTxIDs;
+
+  public DeletedBlockLogStateManagerImpl(
+      ConfigurationSource conf,
+      Table<Long, DeletedBlocksTransaction> deletedTable,
+      DBTransactionBuffer txBuffer) {
+    this.maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
+        OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
+    this.deletedTable = deletedTable;
+    this.transactionBuffer = txBuffer;
+    this.deletingTxIDs = ConcurrentHashMap.newKeySet();
+  }
+
+  public TableIterator<Long, TypedTable.KeyValue<Long,
+      DeletedBlocksTransaction>> getReadOnlyIterator() {
+    return new TableIterator<Long, TypedTable.KeyValue<Long,
+        DeletedBlocksTransaction>>() {
+
+      private TableIterator<Long,
+          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+          deletedTable.iterator();
+      private TypedTable.KeyValue<Long, DeletedBlocksTransaction> nextTx;
+
+      {
+        findNext();
+      }
+
+      private void findNext() {
+        while (iter.hasNext()) {
+          TypedTable.KeyValue<Long, DeletedBlocksTransaction> next = iter
+              .next();
+          long txID;
+          try {
+            txID = next.getKey();
+          } catch (IOException e) {
+            throw new IllegalStateException("");
+          }
+
+          if (!deletingTxIDs.contains(txID)) {
+            nextTx = next;
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("DeletedBlocksTransaction matching txID:{}",
+                  txID);
+            }
+            return;
+          }
+        }
+        nextTx = null;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return nextTx != null;
+      }
+
+      @Override
+      public TypedTable.KeyValue<Long, DeletedBlocksTransaction> next() {
+        if (nextTx == null) {
+          throw new NoSuchElementException("DeletedBlocksTransaction " +
+              "Iterator reached end");
+        }
+        TypedTable.KeyValue<Long, DeletedBlocksTransaction> returnTx = nextTx;
+        findNext();
+        return returnTx;
+      }
+
+      @Override
+      public void close() throws IOException {
+        iter.close();
+      }
+
+      @Override
+      public void seekToFirst() {
+        throw new UnsupportedOperationException("seekToFirst");
+      }
+
+      @Override
+      public void seekToLast() {
+        throw new UnsupportedOperationException("seekToLast");
+      }
+
+      @Override
+      public TypedTable.KeyValue<Long, DeletedBlocksTransaction> seek(
+          Long key) throws IOException {
+        throw new UnsupportedOperationException("seek");
+      }
+
+      @Override
+      public Long key() throws IOException {
+        throw new UnsupportedOperationException("key");
+      }
+
+      @Override
+      public TypedTable.KeyValue<Long, DeletedBlocksTransaction> value() {
+        throw new UnsupportedOperationException("value");
+      }
+
+      @Override
+      public void removeFromDB() throws IOException {
+        throw new UnsupportedOperationException("read-only");
+      }
+    };
+  }
+
+  @Override
+  public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
+      throws IOException {
+    for (DeletedBlocksTransaction tx : txs) {
+      deletedTable.putWithBatch(
+          transactionBuffer.getCurrentBatchOperation(), tx.getTxID(), tx);
+    }
+  }
+
+  @Override
+  public void removeTransactionsFromDB(ArrayList<Long> txIDs)
+      throws IOException {
+    for (Long txID : txIDs) {
+      deletedTable.deleteWithBatch(
+          transactionBuffer.getCurrentBatchOperation(), txID);
+    }
+  }
+
+  @Override
+  public void increaseRetryCountOfTransactionInDB(
+      ArrayList<Long> txIDs) throws IOException {
+    for (Long txID : txIDs) {
+      DeletedBlocksTransaction block =
+          deletedTable.get(txID);
+      if (block == null) {
+        if (LOG.isDebugEnabled()) {
+          // This can occur due to race condition between retry and old
+          // service task where old task removes the transaction and the new
+          // task is resending
+          LOG.debug("Deleted TXID {} not found.", txID);
+        }
+        continue;
+      }
+      DeletedBlocksTransaction.Builder builder = block.toBuilder();
+      int currentCount = block.getCount();
+      if (currentCount > -1) {
+        builder.setCount(++currentCount);
+      }
+      // if the retry time exceeds the maxRetry value
+      // then set the retry value to -1, stop retrying, admins can
+      // analyze those blocks and purge them manually by SCMCli.
+      if (currentCount > maxRetry) {
+        builder.setCount(-1);
+      }
+      deletedTable.putWithBatch(
+          transactionBuffer.getCurrentBatchOperation(), txID, builder.build());
+    }
+  }
+
+
+  public void onFlush() {
+    deletingTxIDs.clear();
+    LOG.info("Clear cached deletingTxIDs.");
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for ContainerStateManager.
+   */
+  public static class Builder {
+    private ConfigurationSource conf;
+    private SCMRatisServer scmRatisServer;
+    private Table<Long, DeletedBlocksTransaction> table;
+    private DBTransactionBuffer transactionBuffer;
+
+    public Builder setConfiguration(final ConfigurationSource config) {
+      conf = config;
+      return this;
+    }
+
+    public Builder setRatisServer(final SCMRatisServer ratisServer) {
+      scmRatisServer = ratisServer;
+      return this;
+    }
+
+    public Builder setDeletedBlocksTable(
+        final Table<Long, DeletedBlocksTransaction> deletedBlocksTable) {
+      table = deletedBlocksTable;
+      return this;
+    }
+
+    public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+      this.transactionBuffer = buffer;
+      return this;
+    }
+
+    public DeletedBlockLogStateManager build() {
+      Preconditions.checkNotNull(conf);
+      Preconditions.checkNotNull(scmRatisServer);
+      Preconditions.checkNotNull(table);
+      Preconditions.checkNotNull(table);
+
+      final DeletedBlockLogStateManager impl =
+          new DeletedBlockLogStateManagerImpl(conf, table, transactionBuffer);
+
+      final SCMHAInvocationHandler invocationHandler =
+          new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK,
+              impl, scmRatisServer);
+
+      return (DeletedBlockLogStateManager) Proxy.newProxyInstance(
+          SCMHAInvocationHandler.class.getClassLoader(),
+          new Class<?>[]{DeletedBlockLogStateManager.class},
+          invocationHandler);
+    }
+
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
index e5af076..451ed1b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
@@ -16,7 +16,11 @@
  */
 package org.apache.hadoop.hdds.scm.ha;
 
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -32,17 +36,21 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
  * operation in DB.
  */
 public class SCMDBTransactionBuffer implements DBTransactionBuffer {
+  private final StorageContainerManager scm;
   private final SCMMetadataStore metadataStore;
   private BatchOperation currentBatchOperation;
   private SCMTransactionInfo latestTrxInfo;
   private SnapshotInfo latestSnapshot;
 
-  public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException {
-    this.metadataStore = store;
+  public SCMDBTransactionBuffer(StorageContainerManager scm)
+      throws IOException {
+    this.scm = scm;
+    this.metadataStore = scm.getScmMetadataStore();
 
     // initialize a batch operation during construction time
     currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
-    latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    latestTrxInfo = this.metadataStore.getTransactionInfoTable()
+        .get(TRANSACTION_INFO_KEY);
     if (latestTrxInfo == null) {
       // transaction table is empty
       latestTrxInfo =
@@ -98,6 +106,12 @@ public class SCMDBTransactionBuffer implements 
DBTransactionBuffer {
     this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
     // reset batch operation
     currentBatchOperation = metadataStore.getStore().initBatchOperation();
+
+    DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+        .getDeletedBlockLog();
+    Preconditions.checkArgument(
+        deletedBlockLog instanceof DeletedBlockLogImplV2);
+    ((DeletedBlockLogImplV2) deletedBlockLog).onFlush();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b795380..1880363 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -48,7 +48,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
       final StorageContainerManager scm) throws IOException {
     this.conf = conf;
     this.transactionBuffer =
-        new SCMDBTransactionBuffer(scm.getScmMetadataStore());
+        new SCMDBTransactionBuffer(scm);
     this.ratisServer = new SCMRatisServerImpl(
         conf.getObject(SCMHAConfiguration.class), conf, scm, 
transactionBuffer);
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index a04f0d8..13c5661 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -26,7 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -156,6 +159,13 @@ public class SCMStateMachine extends BaseStateMachine {
 
     scm.getScmContext().updateLeaderAndTerm(true, term);
     scm.getSCMServiceManager().notifyStatusChanged();
+
+    DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+        .getDeletedBlockLog();
+    Preconditions.checkArgument(
+        deletedBlockLog instanceof DeletedBlockLogImplV2);
+    ((DeletedBlockLogImplV2) deletedBlockLog)
+          .clearTransactionToDNsCommitMap();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 809dda9..ce5e5f3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -61,7 +61,7 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -362,7 +362,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     eventQueue
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
     eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
-        (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
+        (DeletedBlockLogImplV2) scmBlockManager.getDeletedBlockLog());
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 37b2d61..ef6d102 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -77,13 +81,14 @@ import static org.mockito.Mockito.when;
  */
 public class TestDeletedBlockLog {
 
-  private static DeletedBlockLogImpl deletedBlockLog;
+  private static DeletedBlockLogImplV2 deletedBlockLog;
   private static final int BLOCKS_PER_TXN = 5;
   private OzoneConfiguration conf;
   private File testDir;
   private ContainerManagerV2 containerManager;
   private StorageContainerManager scm;
   private List<DatanodeDetails> dnList;
+  private DBTransactionBuffer dbTransactionBuffer;
 
   @Before
   public void setup() throws Exception {
@@ -94,8 +99,12 @@ public class TestDeletedBlockLog {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     scm = TestUtils.getScm(conf);
     containerManager = Mockito.mock(ContainerManagerV2.class);
-    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
-        scm.getScmMetadataStore());
+    dbTransactionBuffer =
+        new MockDBTransactionBuffer(scm.getScmMetadataStore().getStore());
+    deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager,
+        MockSCMHAManager.getInstance(true).getRatisServer(),
+        scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+        dbTransactionBuffer, SCMContext.emptyContext());
     dnList = new ArrayList<>(3);
     setupContainerManager();
   }
@@ -155,31 +164,48 @@ public class TestDeletedBlockLog {
     return blockMap;
   }
 
+  private void addTransactions(Map<Long, List<Long>> containerBlocksMap)
+      throws IOException {
+    dbTransactionBuffer.getCurrentBatchOperation();
+    deletedBlockLog.addTransactions(containerBlocksMap);
+    dbTransactionBuffer.flush();
+  }
+
+  private void incrementCount(List<Long> txIDs) throws IOException {
+    dbTransactionBuffer.getCurrentBatchOperation();
+    deletedBlockLog.incrementCount(txIDs);
+    dbTransactionBuffer.flush();
+  }
+
   private void commitTransactions(
       List<DeleteBlockTransactionResult> transactionResults,
-      DatanodeDetails... dns) {
+      DatanodeDetails... dns) throws IOException {
+    dbTransactionBuffer.getCurrentBatchOperation();
     for (DatanodeDetails dnDetails : dns) {
       deletedBlockLog
           .commitTransactions(transactionResults, dnDetails.getUuid());
     }
+    dbTransactionBuffer.flush();
   }
 
   private void commitTransactions(
-      List<DeleteBlockTransactionResult> transactionResults) {
+      List<DeleteBlockTransactionResult> transactionResults)
+      throws IOException {
     commitTransactions(transactionResults,
         dnList.toArray(new DatanodeDetails[3]));
   }
 
   private void commitTransactions(
       Collection<DeletedBlocksTransaction> deletedBlocksTransactions,
-      DatanodeDetails... dns) {
+      DatanodeDetails... dns) throws IOException {
     commitTransactions(deletedBlocksTransactions.stream()
         .map(this::createDeleteBlockTransactionResult)
         .collect(Collectors.toList()), dns);
   }
 
   private void commitTransactions(
-      Collection<DeletedBlocksTransaction> deletedBlocksTransactions) {
+      Collection<DeletedBlocksTransaction> deletedBlocksTransactions)
+      throws IOException {
     commitTransactions(deletedBlocksTransactions.stream()
         .map(this::createDeleteBlockTransactionResult)
         .collect(Collectors.toList()));
@@ -210,9 +236,7 @@ public class TestDeletedBlockLog {
     int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
 
     // Create 30 TXs in the log.
-    for (Map.Entry<Long, List<Long>> entry : generateData(30).entrySet()){
-      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
-    }
+    addTransactions(generateData(30));
 
     // This will return all TXs, total num 30.
     List<DeletedBlocksTransaction> blocks =
@@ -221,12 +245,12 @@ public class TestDeletedBlockLog {
         .collect(Collectors.toList());
 
     for (int i = 0; i < maxRetry; i++) {
-      deletedBlockLog.incrementCount(txIDs);
+      incrementCount(txIDs);
     }
 
     // Increment another time so it exceed the maxRetry.
     // On this call, count will be set to -1 which means TX eventually fails.
-    deletedBlockLog.incrementCount(txIDs);
+    incrementCount(txIDs);
     blocks = getTransactions(40 * BLOCKS_PER_TXN);
     for (DeletedBlocksTransaction block : blocks) {
       Assert.assertEquals(-1, block.getCount());
@@ -239,9 +263,7 @@ public class TestDeletedBlockLog {
 
   @Test
   public void testCommitTransactions() throws Exception {
-    for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
-      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
-    }
+    addTransactions(generateData(50));
     List<DeletedBlocksTransaction> blocks =
         getTransactions(20 * BLOCKS_PER_TXN);
     // Add an invalid txn.
@@ -279,10 +301,7 @@ public class TestDeletedBlockLog {
     for (int i = 0; i < 100; i++) {
       int state = random.nextInt(4);
       if (state == 0) {
-        for (Map.Entry<Long, List<Long>> entry :
-            generateData(10).entrySet()){
-          deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
-        }
+        addTransactions(generateData(10));
         added += 10;
       } else if (state == 1) {
         blocks = getTransactions(20);
@@ -290,7 +309,7 @@ public class TestDeletedBlockLog {
         for (DeletedBlocksTransaction block : blocks) {
           txIDs.add(block.getTxID());
         }
-        deletedBlockLog.incrementCount(txIDs);
+        incrementCount(txIDs);
       } else if (state == 2) {
         commitTransactions(blocks);
         committed += blocks.size();
@@ -312,14 +331,14 @@ public class TestDeletedBlockLog {
 
   @Test
   public void testPersistence() throws Exception {
-    for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
-      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
-    }
+    addTransactions(generateData(50));
     // close db and reopen it again to make sure
     // transactions are stored persistently.
     deletedBlockLog.close();
-    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
-        scm.getScmMetadataStore());
+    deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager,
+        MockSCMHAManager.getInstance(true).getRatisServer(),
+        scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+        dbTransactionBuffer, SCMContext.emptyContext());
     List<DeletedBlocksTransaction> blocks =
         getTransactions(BLOCKS_PER_TXN * 10);
     Assert.assertEquals(10, blocks.size());
@@ -339,10 +358,11 @@ public class TestDeletedBlockLog {
     long containerID;
 
     // Creates {TXNum} TX in the log.
-    for (Map.Entry<Long, List<Long>> entry : generateData(txNum).entrySet()) {
+    Map<Long, List<Long>> deletedBlocks = generateData(txNum);
+    addTransactions(deletedBlocks);
+    for (Map.Entry<Long, List<Long>> entry :deletedBlocks.entrySet()) {
       count++;
       containerID = entry.getKey();
-      deletedBlockLog.addTransaction(containerID, entry.getValue());
 
       if (count % 2 == 0) {
         mockContainerInfo(containerID, dnId1);
@@ -365,7 +385,9 @@ public class TestDeletedBlockLog {
     builder.setTxID(11);
     builder.setContainerID(containerID);
     builder.setCount(0);
-    deletedBlockLog.addTransaction(containerID, new LinkedList<>());
+    Map<Long, List<Long>> deletedBlocksMap = new HashMap<>();
+    deletedBlocksMap.put(containerID, new LinkedList<>());
+    addTransactions(deletedBlocksMap);
 
     // get should return two transactions for the same container
     blocks = getTransactions(txNum);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 635fe30..a36c275 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -41,7 +41,9 @@ import java.lang.reflect.Modifier;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -281,8 +283,9 @@ public class TestStorageContainerManager {
             cluster.getStorageContainerManager());
       }
 
-      Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
-          keyLocations, helper);
+      Map<Long, List<Long>> containerBlocks = createDeleteTXLog(
+          cluster.getStorageContainerManager(),
+          delLog, keyLocations, helper);
       Set<Long> containerIDs = containerBlocks.keySet();
 
       // Verify a few TX gets created in the TX log.
@@ -295,6 +298,8 @@ public class TestStorageContainerManager {
       // empty again.
       GenericTestUtils.waitFor(() -> {
         try {
+          cluster.getStorageContainerManager().getScmHAManager()
+              .getDBTransactionBuffer().flush();
           return delLog.getNumOfValidTransactions() == 0;
         } catch (IOException e) {
           return false;
@@ -306,10 +311,13 @@ public class TestStorageContainerManager {
       // but unknown block IDs.
       for (Long containerID : containerBlocks.keySet()) {
         // Add 2 TXs per container.
-        delLog.addTransaction(containerID,
-            Collections.singletonList(RandomUtils.nextLong()));
-        delLog.addTransaction(containerID,
-            Collections.singletonList(RandomUtils.nextLong()));
+        Map<Long, List<Long>> deletedBlocks = new HashMap<>();
+        List<Long> blocks = new ArrayList<>();
+        blocks.add(RandomUtils.nextLong());
+        blocks.add(RandomUtils.nextLong());
+        deletedBlocks.put(containerID, blocks);
+        addTransactions(cluster.getStorageContainerManager(), delLog,
+            deletedBlocks);
       }
 
       // Verify a few TX gets created in the TX log.
@@ -319,11 +327,13 @@ public class TestStorageContainerManager {
       // eventually these TX will success.
       GenericTestUtils.waitFor(() -> {
         try {
+          cluster.getStorageContainerManager().getScmHAManager()
+              .getDBTransactionBuffer().flush();
           return delLog.getFailedTransactions().size() == 0;
         } catch (IOException e) {
           return false;
         }
-      }, 1000, 10000);
+      }, 1000, 20000);
     } finally {
       cluster.shutdown();
     }
@@ -374,7 +384,8 @@ public class TestStorageContainerManager {
             cluster.getStorageContainerManager());
       }
 
-      createDeleteTXLog(delLog, keyLocations, helper);
+      createDeleteTXLog(cluster.getStorageContainerManager(),
+          delLog, keyLocations, helper);
       // Verify a few TX gets created in the TX log.
       Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
 
@@ -400,7 +411,9 @@ public class TestStorageContainerManager {
     }
   }
 
-  private Map<Long, List<Long>> createDeleteTXLog(DeletedBlockLog delLog,
+  private Map<Long, List<Long>> createDeleteTXLog(
+      StorageContainerManager scm,
+      DeletedBlockLog delLog,
       Map<String, OmKeyInfo> keyLocations,
       TestStorageContainerManagerHelper helper) throws IOException {
     // These keys will be written into a bunch of containers,
@@ -438,9 +451,7 @@ public class TestStorageContainerManager {
         }
       });
     }
-    for (Map.Entry<Long, List<Long>> tx : containerBlocks.entrySet()) {
-      delLog.addTransaction(tx.getKey(), tx.getValue());
-    }
+    addTransactions(scm, delLog, containerBlocks);
 
     return containerBlocks;
   }
@@ -667,6 +678,14 @@ public class TestStorageContainerManager {
     }
   }
 
+  private void addTransactions(StorageContainerManager scm,
+      DeletedBlockLog delLog,
+      Map<Long, List<Long>> containerBlocksMap)
+      throws IOException {
+    delLog.addTransactions(containerBlocksMap);
+    scm.getScmHAManager().getDBTransactionBuffer().flush();
+  }
+
   @SuppressWarnings("visibilitymodifier")
   static class CloseContainerCommandMatcher
       extends ArgumentMatcher<CommandForDatanode> {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 5a44a2c..bbbd702 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -280,9 +280,15 @@ public class TestDeleteWithSlowFollower {
     client.getObjectStore().getVolume(volumeName).getBucket(bucketName).
             deleteKey("ratis");
     GenericTestUtils.waitFor(() -> {
-      return
-          dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler()
-              .getInvocationCount() >= 1;
+      try {
+        cluster.getStorageContainerManager().getScmHAManager()
+            .getDBTransactionBuffer().flush();
+        return
+            dnStateMachine.getCommandDispatcher()
+                .getDeleteBlocksCommandHandler().getInvocationCount() >= 1;
+      } catch (IOException e) {
+        return false;
+      }
     }, 500, 100000);
     Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
     Assert.assertTrue(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to