HDDS-325. Add event watcher for delete blocks command. Contributed by Lokesh 
Jain.

(cherry picked from commit f7ff8c051e4d4b5cd0eee1884e2a546de1f57793)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1cc129bb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1cc129bb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1cc129bb

Branch: refs/heads/ozone-0.2
Commit: 1cc129bb7fff40e588eda40b1caa9b695611837a
Parents: c04f36d
Author: Nanda kumar <na...@apache.org>
Authored: Mon Oct 1 13:49:55 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Mon Oct 1 13:54:10 2018 +0530

----------------------------------------------------------------------
 .../scm/container/common/helpers/Pipeline.java  |  4 +
 .../report/CommandStatusReportPublisher.java    |  2 +-
 .../common/statemachine/StateContext.java       | 58 +++++++-----
 .../CloseContainerCommandHandler.java           |  3 +-
 .../commandhandler/CommandHandler.java          |  7 +-
 .../DeleteBlocksCommandHandler.java             | 43 +++++----
 .../ReplicateContainerCommandHandler.java       |  3 +-
 .../states/endpoint/HeartbeatEndpointTask.java  | 33 ++++++-
 .../container/ozoneimpl/ContainerReader.java    | 20 +++++
 .../StorageContainerDatanodeProtocol.java       | 13 ---
 .../protocol/commands/CommandForDatanode.java   |  8 +-
 .../ozone/protocol/commands/CommandStatus.java  | 41 +++++++--
 .../commands/DeleteBlockCommandStatus.java      | 92 ++++++++++++++++++++
 ...rDatanodeProtocolClientSideTranslatorPB.java | 17 ----
 ...rDatanodeProtocolServerSideTranslatorPB.java | 16 ----
 .../StorageContainerDatanodeProtocol.proto      | 11 +--
 .../ozone/container/common/ScmTestMock.java     | 24 ++---
 .../common/report/TestReportPublisher.java      |  4 +-
 .../endpoint/TestHeartbeatEndpointTask.java     | 12 +--
 .../block/DatanodeDeletedBlockTransactions.java | 20 ++---
 .../hdds/scm/block/DeletedBlockLogImpl.java     | 24 ++++-
 .../hdds/scm/block/SCMBlockDeletingService.java |  2 +-
 .../scm/command/CommandStatusReportHandler.java | 10 ++-
 .../hdds/scm/container/ContainerMapping.java    | 19 ++--
 .../hadoop/hdds/scm/events/SCMEvents.java       | 19 ++--
 .../server/SCMDatanodeHeartbeatDispatcher.java  | 11 ++-
 .../scm/server/SCMDatanodeProtocolServer.java   | 22 -----
 .../scm/server/StorageContainerManager.java     | 12 +++
 .../commands/RetriableDatanodeEventWatcher.java | 58 ++++++++++++
 .../ozone/protocol/commands/package-info.java   | 18 ++++
 .../command/TestCommandStatusReportHandler.java | 12 +--
 .../TestSCMDatanodeHeartbeatDispatcher.java     |  2 +-
 .../ozone/TestStorageContainerManager.java      |  3 +
 .../commandhandler/TestBlockDeletion.java       | 83 ++++++++++++++++--
 34 files changed, 503 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 777efa7..c36ca1f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -171,6 +171,10 @@ public class Pipeline {
   public Map<String, DatanodeDetails> getDatanodes() {
     return datanodes;
   }
+
+  public boolean isEmpty() {
+    return datanodes.isEmpty();
+  }
   /**
    * Returns the leader host.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
index 4cf6321..4736857 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -82,6 +82,6 @@ public class CommandStatusReportPublisher extends
         map.remove(key);
       }
     });
-    return builder.build();
+    return builder.getCmdStatusCount() > 0 ? builder.build() : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 47c2492..12c196b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -35,8 +37,11 @@ import 
org.apache.hadoop.ozone.container.common.states.datanode
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus
     .CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands
+    .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
+import static java.lang.Math.min;
 import static 
org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +57,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
@@ -67,7 +73,7 @@ public class StateContext {
   private final DatanodeStateMachine parent;
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
-  private final Queue<GeneratedMessage> reports;
+  private final List<GeneratedMessage> reports;
   private final Queue<ContainerAction> containerActions;
   private final Queue<PipelineAction> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
@@ -174,19 +180,23 @@ public class StateContext {
    * @param report report to be added
    */
   public void addReport(GeneratedMessage report) {
-    synchronized (reports) {
-      reports.add(report);
+    if (report != null) {
+      synchronized (reports) {
+        reports.add(report);
+      }
     }
   }
 
   /**
-   * Returns the next report, or null if the report queue is empty.
+   * Adds the reports which could not be sent by heartbeat back to the
+   * reports list.
    *
-   * @return report
+   * @param reportsToPutBack list of reports which failed to be sent by
+   *                         heartbeat.
    */
-  public GeneratedMessage getNextReport() {
+  public void putBackReports(List<GeneratedMessage> reportsToPutBack) {
     synchronized (reports) {
-      return reports.poll();
+      reports.addAll(0, reportsToPutBack);
     }
   }
 
@@ -207,19 +217,14 @@ public class StateContext {
    * @return List<reports>
    */
   public List<GeneratedMessage> getReports(int maxLimit) {
-    List<GeneratedMessage> reportList = new ArrayList<>();
+    List<GeneratedMessage> reportsToReturn = new LinkedList<>();
     synchronized (reports) {
-      if (!reports.isEmpty()) {
-        int size = reports.size();
-        int limit = size > maxLimit ? maxLimit : size;
-        for (int count = 0; count < limit; count++) {
-          GeneratedMessage report = reports.poll();
-          Preconditions.checkNotNull(report);
-          reportList.add(report);
-        }
-      }
-      return reportList;
+      List<GeneratedMessage> tempList = reports.subList(
+          0, min(reports.size(), maxLimit));
+      reportsToReturn.addAll(tempList);
+      tempList.clear();
     }
+    return reportsToReturn;
   }
 
 
@@ -442,9 +447,14 @@ public class StateContext {
    * @param cmd - {@link SCMCommand}.
    */
   public void addCmdStatus(SCMCommand cmd) {
+    CommandStatusBuilder statusBuilder;
+    if (cmd.getType() == Type.deleteBlocksCommand) {
+      statusBuilder = new DeleteBlockCommandStatusBuilder();
+    } else {
+      statusBuilder = CommandStatusBuilder.newBuilder();
+    }
     this.addCmdStatus(cmd.getId(),
-        CommandStatusBuilder.newBuilder()
-            .setCmdId(cmd.getId())
+        statusBuilder.setCmdId(cmd.getId())
             .setStatus(Status.PENDING)
             .setType(cmd.getType())
             .build());
@@ -469,13 +479,13 @@ public class StateContext {
   /**
    * Updates status of a pending status command.
    * @param cmdId       command id
-   * @param cmdExecuted SCMCommand
+   * @param cmdStatusUpdater Consumer to update command status.
    * @return true if command status updated successfully else false.
    */
-  public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
+  public boolean updateCommandStatus(Long cmdId,
+      Consumer<CommandStatus> cmdStatusUpdater) {
     if(cmdStatusMap.containsKey(cmdId)) {
-      cmdStatusMap.get(cmdId)
-          .setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
+      cmdStatusUpdater.accept(cmdStatusMap.get(cmdId));
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index d4e13ee..3f73218 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -106,7 +106,8 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
         cmdExecuted = false;
       }
     } finally {
-      updateCommandStatus(context, command, cmdExecuted, LOG);
+      updateCommandStatus(context, command,
+          (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 71c25b5..1ea0ea8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -23,9 +23,12 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 
+import java.util.function.Consumer;
+
 /**
  * Generic interface for handlers.
  */
@@ -63,8 +66,8 @@ public interface CommandHandler {
    * Default implementation for updating command status.
    */
   default void updateCommandStatus(StateContext context, SCMCommand command,
-      boolean cmdExecuted, Logger log) {
-    if (!context.updateCommandStatus(command.getId(), cmdExecuted)) {
+      Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
+    if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
       log.debug("{} with Id:{} not found.", command.getType(),
           command.getId());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index b0d4cbc..11eac06 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -39,11 +39,11 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_NOT_FOUND;
@@ -63,7 +64,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
  */
 public class DeleteBlocksCommandHandler implements CommandHandler {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
 
   private final ContainerSet containerSet;
@@ -83,6 +84,7 @@ public class DeleteBlocksCommandHandler implements 
CommandHandler {
       StateContext context, SCMConnectionManager connectionManager) {
     cmdExecuted = false;
     long startTime = Time.monotonicNow();
+    ContainerBlocksDeletionACKProto blockDeletionACK = null;
     try {
       if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
         LOG.warn("Skipping handling command, expected command "
@@ -144,31 +146,28 @@ public class DeleteBlocksCommandHandler implements 
CommandHandler {
             .setDnId(context.getParent().getDatanodeDetails()
                 .getUuid().toString());
       });
-      ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+      blockDeletionACK = resultBuilder.build();
 
       // Send ACK back to SCM as long as meta updated
       // TODO Or we should wait until the blocks are actually deleted?
       if (!containerBlocks.isEmpty()) {
-        for (EndpointStateMachine endPoint : connectionManager.getValues()) {
-          try {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sending following block deletion ACK to SCM");
-              for (DeleteBlockTransactionResult result :
-                  blockDeletionACK.getResultsList()) {
-                LOG.debug(result.getTxID() + " : " + result.getSuccess());
-              }
-            }
-            endPoint.getEndPoint()
-                .sendContainerBlocksDeletionACK(blockDeletionACK);
-          } catch (IOException e) {
-            LOG.error("Unable to send block deletion ACK to SCM {}",
-                endPoint.getAddress().toString(), e);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Sending following block deletion ACK to SCM");
+          for (DeleteBlockTransactionResult result : blockDeletionACK
+              .getResultsList()) {
+            LOG.debug(result.getTxID() + " : " + result.getSuccess());
           }
         }
       }
       cmdExecuted = true;
     } finally {
-      updateCommandStatus(context, command, cmdExecuted, LOG);
+      final ContainerBlocksDeletionACKProto deleteAck =
+          blockDeletionACK;
+      Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
+        cmdStatus.setStatus(cmdExecuted);
+        ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck);
+      };
+      updateCommandStatus(context, command, statusUpdater, LOG);
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
     }
@@ -238,9 +237,9 @@ public class DeleteBlocksCommandHandler implements 
CommandHandler {
       }
     }
 
-    containerDB.put(DFSUtil.string2Bytes(
-        OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
-        Longs.toByteArray(delTX.getTxID()));
+    containerDB
+        .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
+            Longs.toByteArray(delTX.getTxID()));
     containerData
         .updateDeleteTransactionId(delTX.getTxID());
     // update pending deletion blocks count in in-memory container status

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 09c379f..81d162d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -77,7 +77,8 @@ public class ReplicateContainerCommandHandler implements 
CommandHandler {
       supervisor.addTask(replicationTask);
 
     } finally {
-      updateCommandStatus(context, command, true, LOG);
+      updateCommandStatus(context, command,
+          (cmdStatus) -> cmdStatus.setStatus(true), LOG);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 5769e6d..4fd72ec 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZonedDateTime;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -124,12 +125,12 @@ public class HeartbeatEndpointTask
   @Override
   public EndpointStateMachine.EndPointStates call() throws Exception {
     rpcEndpoint.lock();
+    SCMHeartbeatRequestProto.Builder requestBuilder = null;
     try {
       Preconditions.checkState(this.datanodeDetailsProto != null);
 
-      SCMHeartbeatRequestProto.Builder requestBuilder =
-          SCMHeartbeatRequestProto.newBuilder()
-              .setDatanodeDetails(datanodeDetailsProto);
+      requestBuilder = SCMHeartbeatRequestProto.newBuilder()
+          .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
       addContainerActions(requestBuilder);
       addPipelineActions(requestBuilder);
@@ -139,6 +140,8 @@ public class HeartbeatEndpointTask
       rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
+      // put back the reports which failed to be sent
+      putBackReports(requestBuilder);
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -146,6 +149,24 @@ public class HeartbeatEndpointTask
     return rpcEndpoint.getState();
   }
 
+  // TODO: Make it generic.
+  private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) 
{
+    List<GeneratedMessage> reports = new LinkedList<>();
+    if (requestBuilder.hasContainerReport()) {
+      reports.add(requestBuilder.getContainerReport());
+    }
+    if (requestBuilder.hasNodeReport()) {
+      reports.add(requestBuilder.getNodeReport());
+    }
+    if (requestBuilder.getCommandStatusReportsCount() != 0) {
+      for (GeneratedMessage msg : requestBuilder
+          .getCommandStatusReportsList()) {
+        reports.add(msg);
+      }
+    }
+    context.putBackReports(reports);
+  }
+
   /**
    * Adds all the available reports to heartbeat.
    *
@@ -158,7 +179,11 @@ public class HeartbeatEndpointTask
           SCMHeartbeatRequestProto.getDescriptor().getFields()) {
         String heartbeatFieldName = descriptor.getMessageType().getFullName();
         if (heartbeatFieldName.equals(reportName)) {
-          requestBuilder.setField(descriptor, report);
+          if (descriptor.isRepeated()) {
+            requestBuilder.addRepeatedField(descriptor, report);
+          } else {
+            requestBuilder.setField(descriptor, report);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 7c986f0..c3a4126 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -19,10 +19,13 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -32,7 +35,10 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,6 +177,20 @@ public class ContainerReader implements Runnable {
         KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
         KeyValueContainer kvContainer = new KeyValueContainer(
             kvContainerData, config);
+        MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config);
+        MetadataKeyFilters.KeyPrefixFilter filter =
+            new MetadataKeyFilters.KeyPrefixFilter()
+                .addFilter(OzoneConsts.DELETING_KEY_PREFIX);
+        int numPendingDeletionBlocks =
+            containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
+                .size();
+        kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
+        byte[] delTxnId = containerDB.get(
+            DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
+        if (delTxnId != null) {
+          kvContainerData
+              .updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
+        }
         containerSet.addContainer(kvContainer);
       } else {
         throw new StorageContainerException("Container File is corrupted. " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 9296524..e3b5370 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -23,11 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@@ -77,12 +72,4 @@ public interface StorageContainerDatanodeProtocol {
           ContainerReportsProto containerReportsRequestProto,
           PipelineReportsProto pipelineReports) throws IOException;
 
-  /**
-   * Used by datanode to send block deletion ACK to SCM.
-   * @param request block deletion transactions.
-   * @return block deletion transaction response.
-   * @throws IOException
-   */
-  ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
-      ContainerBlocksDeletionACKProto request) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
index 0c4964a..69337fb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.ozone.protocol.commands;
 import java.util.UUID;
 
 import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 
 /**
  * Command for the datanode with the destination address.
  */
-public class CommandForDatanode<T extends GeneratedMessage> {
+public class CommandForDatanode<T extends GeneratedMessage> implements
+    IdentifiableEventPayload {
 
   private final UUID datanodeId;
 
@@ -42,4 +44,8 @@ public class CommandForDatanode<T extends GeneratedMessage> {
   public SCMCommand<T> getCommand() {
     return command;
   }
+
+  public long getId() {
+    return command.getId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
index 32cf7c2..4b3ce84 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
@@ -35,6 +35,13 @@ public class CommandStatus {
   private Status status;
   private String msg;
 
+  CommandStatus(Type type, Long cmdId, Status status, String msg) {
+    this.type = type;
+    this.cmdId = cmdId;
+    this.status = status;
+    this.msg = msg;
+  }
+
   public Type getType() {
     return type;
   }
@@ -60,6 +67,10 @@ public class CommandStatus {
     this.status = status;
   }
 
+  public void setStatus(boolean cmdExecuted) {
+    setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
+  }
+
   /**
    * Returns a CommandStatus from the protocol buffers.
    *
@@ -72,7 +83,8 @@ public class CommandStatus {
         .setCmdId(cmdStatusProto.getCmdId())
         .setStatus(cmdStatusProto.getStatus())
         .setType(cmdStatusProto.getType())
-        .setMsg(cmdStatusProto.getMsg()).build();
+        .setMsg(cmdStatusProto.getMsg())
+        .build();
   }
   /**
    * Returns a CommandStatus from the protocol buffers.
@@ -95,20 +107,36 @@ public class CommandStatus {
   /**
    * Builder class for CommandStatus.
    */
-  public static final class CommandStatusBuilder {
+  public static class CommandStatusBuilder {
 
     private SCMCommandProto.Type type;
     private Long cmdId;
     private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status;
     private String msg;
 
-    private CommandStatusBuilder() {
+    CommandStatusBuilder() {
     }
 
     public static CommandStatusBuilder newBuilder() {
       return new CommandStatusBuilder();
     }
 
+    public Type getType() {
+      return type;
+    }
+
+    public Long getCmdId() {
+      return cmdId;
+    }
+
+    public Status getStatus() {
+      return status;
+    }
+
+    public String getMsg() {
+      return msg;
+    }
+
     public CommandStatusBuilder setType(Type commandType) {
       this.type = commandType;
       return this;
@@ -130,12 +158,7 @@ public class CommandStatus {
     }
 
     public CommandStatus build() {
-      CommandStatus commandStatus = new CommandStatus();
-      commandStatus.type = this.type;
-      commandStatus.msg = this.msg;
-      commandStatus.status = this.status;
-      commandStatus.cmdId = this.cmdId;
-      return commandStatus;
+      return new CommandStatus(type, cmdId, status, msg);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java
new file mode 100644
index 0000000..2659ab3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+
+public class DeleteBlockCommandStatus extends CommandStatus {
+
+  private ContainerBlocksDeletionACKProto blocksDeletionAck = null;
+
+  public DeleteBlockCommandStatus(Type type, Long cmdId,
+      StorageContainerDatanodeProtocolProtos.CommandStatus.Status status,
+      String msg, ContainerBlocksDeletionACKProto blocksDeletionAck) {
+    super(type, cmdId, status, msg);
+    this.blocksDeletionAck = blocksDeletionAck;
+  }
+
+  public void setBlocksDeletionAck(
+      ContainerBlocksDeletionACKProto deletionAck) {
+    blocksDeletionAck = deletionAck;
+  }
+
+  @Override
+  public CommandStatus getFromProtoBuf(
+      StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
+    return DeleteBlockCommandStatusBuilder.newBuilder()
+        .setBlockDeletionAck(cmdStatusProto.getBlockDeletionAck())
+        .setCmdId(cmdStatusProto.getCmdId())
+        .setStatus(cmdStatusProto.getStatus())
+        .setType(cmdStatusProto.getType())
+        .setMsg(cmdStatusProto.getMsg())
+        .build();
+  }
+
+  @Override
+  public StorageContainerDatanodeProtocolProtos.CommandStatus 
getProtoBufMessage() {
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
+        StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
+            .setCmdId(this.getCmdId())
+            .setStatus(this.getStatus())
+            .setType(this.getType());
+    if (blocksDeletionAck != null) {
+      builder.setBlockDeletionAck(blocksDeletionAck);
+    }
+    if (this.getMsg() != null) {
+      builder.setMsg(this.getMsg());
+    }
+    return builder.build();
+  }
+
+  public static final class DeleteBlockCommandStatusBuilder
+      extends CommandStatusBuilder {
+    private ContainerBlocksDeletionACKProto blocksDeletionAck = null;
+
+    public static DeleteBlockCommandStatusBuilder newBuilder() {
+      return new DeleteBlockCommandStatusBuilder();
+    }
+
+    public DeleteBlockCommandStatusBuilder setBlockDeletionAck(
+        ContainerBlocksDeletionACKProto deletionAck) {
+      this.blocksDeletionAck = deletionAck;
+      return this;
+    }
+
+    @Override
+    public CommandStatus build() {
+      return new DeleteBlockCommandStatus(getType(), getCmdId(), getStatus(),
+          getMsg(), blocksDeletionAck);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index b9cf6f9..4e1e27e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -23,11 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
 
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
@@ -169,16 +164,4 @@ public class 
StorageContainerDatanodeProtocolClientSideTranslatorPB
     return response;
   }
 
-  @Override
-  public ContainerBlocksDeletionACKResponseProto 
sendContainerBlocksDeletionACK(
-      ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
-    final ContainerBlocksDeletionACKResponseProto resp;
-    try {
-      resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
-          deletedBlocks);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return resp;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index ed01822..8622332 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -33,11 +33,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@@ -96,15 +91,4 @@ public class 
StorageContainerDatanodeProtocolServerSideTranslatorPB
     }
   }
 
-
-  @Override
-  public ContainerBlocksDeletionACKResponseProto 
sendContainerBlocksDeletionACK(
-      RpcController controller, ContainerBlocksDeletionACKProto request)
-      throws ServiceException {
-    try {
-      return impl.sendContainerBlocksDeletionACK(request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 78758cb..982029c 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional CommandStatusReportsProto commandStatusReport = 4;
+  repeated CommandStatusReportsProto commandStatusReports = 4;
   optional ContainerActionsProto containerActions = 5;
   optional PipelineActionsProto pipelineActions = 6;
   optional PipelineReportsProto pipelineReports = 7;
@@ -145,6 +145,7 @@ message CommandStatus {
   required Status status = 2 [default = PENDING];
   required SCMCommandProto.Type type = 3;
   optional string msg = 4;
+  optional ContainerBlocksDeletionACKProto blockDeletionAck = 5;
 }
 
 message ContainerActionsProto {
@@ -272,10 +273,6 @@ message ContainerBlocksDeletionACKProto {
   required string dnId = 2;
 }
 
-// SendACK response returned by datanode to SCM, currently empty.
-message ContainerBlocksDeletionACKResponseProto {
-}
-
 /**
 This command asks the datanode to close a specific container.
 */
@@ -386,8 +383,4 @@ service StorageContainerDatanodeProtocolService {
    */
   rpc sendHeartbeat (SCMHeartbeatRequestProto) returns 
(SCMHeartbeatResponseProto);
 
-  /**
-   * Sends the block deletion ACK to SCM.
-   */
-  rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns 
(ContainerBlocksDeletionACKResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 27b6272..3e45596 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.VersionInfo;
@@ -30,11 +32,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -196,10 +193,12 @@ public class ScmTestMock implements 
StorageContainerDatanodeProtocol {
       sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
     rpcCount.incrementAndGet();
     heartbeatCount.incrementAndGet();
-    if(heartbeat.hasCommandStatusReport()){
-      cmdStatusList.addAll(heartbeat.getCommandStatusReport()
-          .getCmdStatusList());
-      commandStatusReport.incrementAndGet();
+    if (heartbeat.getCommandStatusReportsCount() != 0) {
+      for (CommandStatusReportsProto statusReport : heartbeat
+          .getCommandStatusReportsList()) {
+        cmdStatusList.addAll(statusReport.getCmdStatusList());
+        commandStatusReport.incrementAndGet();
+      }
     }
     sleepIfNeeded();
     return SCMHeartbeatResponseProto.newBuilder().addAllCommands(
@@ -305,13 +304,6 @@ public class ScmTestMock implements 
StorageContainerDatanodeProtocol {
     return 0;
   }
 
-  @Override
-  public ContainerBlocksDeletionACKResponseProto 
sendContainerBlocksDeletionACK(
-      ContainerBlocksDeletionACKProto request) throws IOException {
-    return ContainerBlocksDeletionACKResponseProto
-        .newBuilder().getDefaultInstanceForType();
-  }
-
   /**
    * Reset the mock Scm for test to get a fresh start without rebuild MockScm.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 811599f..1e82326 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -138,9 +138,7 @@ public class TestReportPublisher {
             new ThreadFactoryBuilder().setDaemon(true)
                 .setNameFormat("Unit test ReportManager Thread - %d").build());
     publisher.init(dummyContext, executorService);
-    Assert.assertEquals(0,
-        ((CommandStatusReportPublisher) publisher).getReport()
-            .getCmdStatusCount());
+    Assert.assertNull(((CommandStatusReportPublisher) publisher).getReport());
 
     // Insert to status object to state context map and then get the report.
     CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 69a6a33..606940b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -77,7 +77,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertFalse(heartbeat.hasNodeReport());
     Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
     Assert.assertFalse(heartbeat.hasContainerActions());
   }
 
@@ -108,7 +108,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertTrue(heartbeat.hasNodeReport());
     Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
     Assert.assertFalse(heartbeat.hasContainerActions());
   }
 
@@ -139,7 +139,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertFalse(heartbeat.hasNodeReport());
     Assert.assertTrue(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
     Assert.assertFalse(heartbeat.hasContainerActions());
   }
 
@@ -170,7 +170,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertFalse(heartbeat.hasNodeReport());
     Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0);
     Assert.assertFalse(heartbeat.hasContainerActions());
   }
 
@@ -201,7 +201,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertFalse(heartbeat.hasNodeReport());
     Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
     Assert.assertTrue(heartbeat.hasContainerActions());
   }
 
@@ -235,7 +235,7 @@ public class TestHeartbeatEndpointTask {
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
     Assert.assertTrue(heartbeat.hasNodeReport());
     Assert.assertTrue(heartbeat.hasContainerReport());
-    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0);
     Assert.assertTrue(heartbeat.hasContainerActions());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 25420fe..8702a42 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -61,7 +61,8 @@ public class DatanodeDeletedBlockTransactions {
     try {
       ContainerWithPipeline containerWithPipeline =
           mappingService.getContainerWithPipeline(tx.getContainerID());
-      if (containerWithPipeline.getContainerInfo().isContainerOpen()) {
+      if (containerWithPipeline.getContainerInfo().isContainerOpen()
+          || containerWithPipeline.getPipeline().isEmpty()) {
         return false;
       }
       pipeline = containerWithPipeline.getPipeline();
@@ -70,25 +71,19 @@ public class DatanodeDeletedBlockTransactions {
       return false;
     }
 
-    if (pipeline == null) {
-      SCMBlockDeletingService.LOG.warn(
-          "Container {} not found, continue to process next",
-          tx.getContainerID());
-      return false;
-    }
-
+    boolean success = false;
     for (DatanodeDetails dd : pipeline.getMachines()) {
       UUID dnID = dd.getUuid();
       if (dnsWithTransactionCommitted == null ||
           !dnsWithTransactionCommitted.contains(dnID)) {
         // Transaction need not be sent to dns which have already committed it
-        addTransactionToDN(dnID, tx);
+        success = addTransactionToDN(dnID, tx);
       }
     }
-    return true;
+    return success;
   }
 
-  private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
+  private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
     if (transactions.containsKey(dnID)) {
       List<DeletedBlocksTransaction> txs = transactions.get(dnID);
       if (txs != null && txs.size() < maximumAllowedTXNum) {
@@ -103,14 +98,17 @@ public class DatanodeDeletedBlockTransactions {
         if (!hasContained) {
           txs.add(tx);
           currentTXNum++;
+          return true;
         }
       }
     } else {
       currentTXNum++;
       transactions.put(dnID, tx);
+      return true;
     }
     SCMBlockDeletingService.LOG
         .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
+    return false;
   }
 
   Set<UUID> getDatanodeIDs() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 49af65c..68435d1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -23,10 +23,16 @@ import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
     .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.scm.command
+    .CommandStatusReportHandler.DeleteBlockStatus;
 import org.apache.hadoop.hdds.scm.container.Mapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -75,9 +81,10 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
  * equally same chance to be retrieved which only depends on the nature
  * order of the transaction ID.
  */
-public class DeletedBlockLogImpl implements DeletedBlockLog {
+public class DeletedBlockLogImpl
+    implements DeletedBlockLog, EventHandler<DeleteBlockStatus> {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(DeletedBlockLogImpl.class);
 
   private static final byte[] LATEST_TXID =
@@ -123,7 +130,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog 
{
   }
 
   @VisibleForTesting
-  MetadataStore getDeletedStore() {
+  public MetadataStore getDeletedStore() {
     return deletedStore;
   }
 
@@ -269,6 +276,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog 
{
               deletedStore.delete(Longs.toByteArray(txID));
             }
           }
+          LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
+              txID, containerId, dnID);
         } catch (IOException e) {
           LOG.warn("Could not commit delete block transaction: " +
               transactionResult.getTxID(), e);
@@ -407,4 +416,13 @@ public class DeletedBlockLogImpl implements 
DeletedBlockLog {
       lock.unlock();
     }
   }
+
+  @Override
+  public void onMessage(DeleteBlockStatus deleteBlockStatus,
+      EventPublisher publisher) {
+    ContainerBlocksDeletionACKProto ackProto =
+        deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
+    commitTransactions(ackProto.getResultsList(),
+        UUID.fromString(ackProto.getDnId()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index de3fe26..b85d77f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -165,7 +165,7 @@ public class SCMBlockDeletingService extends 
BackgroundService {
             // We should stop caching new commands if num of un-processed
             // command is bigger than a limit, e.g 50. In case datanode goes
             // offline for sometime, the cached commands be flooded.
-            eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+            eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND,
                 new CommandForDatanode<>(dnId, new 
DeleteBlocksCommand(dnTXs)));
             LOG.debug(
                 "Added delete block command for datanode {} in the queue,"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 054665a..c0de382 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -63,8 +63,10 @@ public class CommandStatusReportHandler implements
             CloseContainerStatus(cmdStatus));
         break;
       case deleteBlocksCommand:
-        publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new
-            DeleteBlockCommandStatus(cmdStatus));
+        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
+          publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
+              new DeleteBlockStatus(cmdStatus));
+        }
         break;
       default:
         LOGGER.debug("CommandStatus of type:{} not handled in " +
@@ -120,8 +122,8 @@ public class CommandStatusReportHandler implements
   /**
    * Wrapper event for DeleteBlock Command.
    */
-  public static class DeleteBlockCommandStatus extends CommandStatusEvent {
-    public DeleteBlockCommandStatus(CommandStatus cmdStatus) {
+  public static class DeleteBlockStatus extends CommandStatusEvent {
+    public DeleteBlockStatus(CommandStatus cmdStatus) {
       super(cmdStatus);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index eb0a0b4..71e17e9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -33,7 +33,6 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -176,7 +175,10 @@ public class ContainerMapping implements Mapping {
   }
 
   /**
-   * Returns the ContainerInfo from the container ID.
+   * Returns the ContainerInfo and pipeline from the containerID. If container
+   * has no available replicas in datanodes it returns pipeline with no
+   * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
+   * an empty pipeline.
    *
    * @param containerID - ID of container.
    * @return - ContainerWithPipeline such as creation state and the pipeline.
@@ -200,6 +202,7 @@ public class ContainerMapping implements Mapping {
       contInfo = ContainerInfo.fromProtobuf(temp);
 
       Pipeline pipeline;
+      String leaderId = "";
       if (contInfo.isContainerOpen()) {
         // If pipeline with given pipeline Id already exist return it
         pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
@@ -207,14 +210,12 @@ public class ContainerMapping implements Mapping {
         // For close containers create pipeline from datanodes with replicas
         Set<DatanodeDetails> dnWithReplicas = containerStateManager
             .getContainerReplicas(contInfo.containerID());
-        if (dnWithReplicas.size() == 0) {
-          throw new SCMException("Can't create a pipeline for container with "
-              + "no replica.", ResultCodes.NO_REPLICA_FOUND);
+        if (!dnWithReplicas.isEmpty()) {
+          leaderId = dnWithReplicas.iterator().next().getUuidString();
         }
-        pipeline =
-            new Pipeline(dnWithReplicas.iterator().next().getUuidString(),
-                contInfo.getState(), ReplicationType.STAND_ALONE,
-                contInfo.getReplicationFactor(), PipelineID.randomId());
+        pipeline = new Pipeline(leaderId, contInfo.getState(),
+            ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
+            PipelineID.randomId());
         dnWithReplicas.forEach(pipeline::addMember);
       }
       return new ContainerWithPipeline(contInfo, pipeline);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 745e052..77b8713 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -23,8 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .CloseContainerStatus;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .DeleteBlockCommandStatus;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
@@ -128,6 +127,10 @@ public final class SCMEvents {
   public static final Event<CommandForDatanode> DATANODE_COMMAND =
       new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
 
+  public static final TypedEvent<CommandForDatanode>
+      RETRIABLE_DATANODE_COMMAND =
+      new TypedEvent<>(CommandForDatanode.class, "Retriable_Datanode_Command");
+
   /**
    * A Close Container Event can be triggered under many condition. Some of 
them
    * are: 1. A Container is full, then we stop writing further information to
@@ -179,7 +182,7 @@ public final class SCMEvents {
    * status for Replication SCMCommand is received.
    */
   public static final Event<ReplicationStatus> REPLICATION_STATUS = new
-      TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus");
+      TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
    * status for CloseContainer SCMCommand is received.
@@ -187,15 +190,15 @@ public final class SCMEvents {
   public static final Event<CloseContainerStatus>
       CLOSE_CONTAINER_STATUS =
       new TypedEvent<>(CloseContainerStatus.class,
-          "CloseContainerCommandStatus");
+          "Close_Container_Command_Status");
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
    * status for DeleteBlock SCMCommand is received.
    */
-  public static final Event<DeleteBlockCommandStatus>
+  public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus>
       DELETE_BLOCK_STATUS =
-      new TypedEvent<>(DeleteBlockCommandStatus.class,
-          "DeleteBlockCommandStatus");
+      new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class,
+          "Delete_Block_Status");
 
   /**
    * This event will be triggered while processing container reports from DN
@@ -203,7 +206,7 @@ public final class SCMEvents {
    * deleteTransactionID on SCM.
    */
   public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
-      new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus");
+      new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status");
 
   /**
    * This is the command for ReplicationManager to handle under/over

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index e65de8b..d9a0875 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -121,10 +121,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getPipelineActions()));
     }
 
-    if (heartbeat.hasCommandStatusReport()) {
-      eventPublisher.fireEvent(CMD_STATUS_REPORT,
-          new CommandStatusReportFromDatanode(datanodeDetails,
-              heartbeat.getCommandStatusReport()));
+    if (heartbeat.getCommandStatusReportsCount() != 0) {
+      for (CommandStatusReportsProto commandStatusReport : heartbeat
+          .getCommandStatusReportsList()) {
+        eventPublisher.fireEvent(CMD_STATUS_REPORT,
+            new CommandStatusReportFromDatanode(datanodeDetails,
+                commandStatusReport));
+      }
     }
 
     return commands;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 4a0d3e5..9c6fa88 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -49,15 +49,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
-
 
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto
@@ -100,7 +91,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@@ -242,18 +232,6 @@ public class SCMDatanodeProtocolServer implements
         .addAllCommands(cmdResponses).build();
   }
 
-  @Override
-  public ContainerBlocksDeletionACKResponseProto 
sendContainerBlocksDeletionACK(
-      ContainerBlocksDeletionACKProto acks) throws IOException {
-    if (acks.getResultsCount() > 0) {
-      List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
-      scm.getScmBlockManager().getDeletedBlockLog()
-          .commitTransactions(resultList, UUID.fromString(acks.getDnId()));
-    }
-    return ContainerBlocksDeletionACKResponseProto.newBuilder()
-        .getDefaultInstanceForType();
-  }
-
   /**
    * Returns a SCMCommandRepose from the SCM Command.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index f491d30..6ddec4c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -68,6 +69,7 @@ import 
org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -253,6 +255,13 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
         watcherTimeout);
 
+    RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
+        new RetriableDatanodeEventWatcher<>(
+            SCMEvents.RETRIABLE_DATANODE_COMMAND,
+            SCMEvents.DELETE_BLOCK_STATUS,
+            commandWatcherLeaseManager);
+    retriableDatanodeEventWatcher.start(eventQueue);
+
     //TODO: support configurable containerPlacement policy
     ContainerPlacementPolicy containerPlacementPolicy =
         new SCMContainerPlacementCapacity(scmNodeManager, conf);
@@ -282,6 +291,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     httpServer = new StorageContainerManagerHttpServer(conf);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
+    eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, 
scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
@@ -296,6 +306,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         replicationStatus.getChillModeStatusListener());
     eventQueue
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
+    eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
+        (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
         pipelineActionEventHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java
new file mode 100644
index 0000000..2a50bca
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import 
org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.CommandStatusEvent;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * EventWatcher for start events and completion events with payload of type
+ * RetriablePayload and RetriableCompletionPayload respectively.
+ */
+public class RetriableDatanodeEventWatcher<T extends CommandStatusEvent>
+    extends EventWatcher<CommandForDatanode, T> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RetriableDatanodeEventWatcher.class);
+
+  public RetriableDatanodeEventWatcher(Event<CommandForDatanode> startEvent,
+      Event<T> completionEvent, LeaseManager<Long> leaseManager) {
+    super(startEvent, completionEvent, leaseManager);
+  }
+
+  @Override
+  protected void onTimeout(EventPublisher publisher,
+      CommandForDatanode payload) {
+    LOG.info("RetriableDatanodeCommand type={} with id={} timed out. 
Retrying.",
+        payload.getCommand().getType(), payload.getId());
+    //put back to the original queue
+    publisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, payload);
+  }
+
+  @Override
+  protected void onFinished(EventPublisher publisher,
+      CommandForDatanode payload) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java
new file mode 100644
index 0000000..b1d2838
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
index 65a2e29..afa25e2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
@@ -67,20 +67,20 @@ public class TestCommandStatusReportHandler implements 
EventPublisher {
     CommandStatusReportFromDatanode report = this.getStatusReport(Collections
         .emptyList());
     cmdStatusReportHandler.onMessage(report, this);
-    assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
+    assertFalse(logCapturer.getOutput().contains("Delete_Block_Status"));
     assertFalse(logCapturer.getOutput().contains(
-        "CloseContainerCommandStatus"));
-    assertFalse(logCapturer.getOutput().contains("ReplicateCommandStatus"));
+        "Close_Container_Command_Status"));
+    assertFalse(logCapturer.getOutput().contains("Replicate_Command_Status"));
 
 
     report = this.getStatusReport(this.getCommandStatusList());
     cmdStatusReportHandler.onMessage(report, this);
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
-        "DeleteBlockCommandStatus"));
+        "Delete_Block_Status"));
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
-        "CloseContainerCommandStatus"));
+        "Close_Container_Command_Status"));
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
-        "ReplicateCommandStatus"));
+        "Replicate_Command_Status"));
 
     assertTrue(logCapturer.getOutput().contains("type: " +
         "closeContainerCommand"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
index 6a0b909..f3cd4ea 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
@@ -127,7 +127,7 @@ public class TestSCMDatanodeHeartbeatDispatcher {
         SCMHeartbeatRequestProto.newBuilder()
             .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
             .setContainerReport(containerReport)
-            .setCommandStatusReport(commandStatusReport)
+            .addCommandStatusReports(commandStatusReport)
             .build();
     dispatcher.dispatch(heartbeat);
     Assert.assertEquals(2, eventReceived.get());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cc129bb/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index fe53bcc..ac3ad5d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone;
 
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.junit.Assert.fail;
 
@@ -188,6 +189,8 @@ public class TestStorageContainerManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
         3000,
         TimeUnit.MILLISECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to