HDDS-801. Quasi close the container when close is not executed via Ratis.
Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4d96400
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4d96400
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4d96400

Branch: refs/heads/trunk
Commit: c4d96400280d9a1595b82435297dbbed2668c485
Parents: ad5256e
Author: Nanda kumar <[email protected]>
Authored: Fri Nov 16 23:07:45 2018 +0530
Committer: Nanda kumar <[email protected]>
Committed: Fri Nov 16 23:11:33 2018 +0530

----------------------------------------------------------------------
 .../main/proto/DatanodeContainerProtocol.proto  |   4 +-
 .../container/common/impl/ContainerData.java    |  17 +-
 .../container/common/impl/HddsDispatcher.java   |  14 +-
 .../container/common/interfaces/Container.java  |  15 +-
 .../container/common/interfaces/Handler.java    |  68 +++++-
 .../common/report/ContainerReportPublisher.java |   3 +-
 .../statemachine/DatanodeStateMachine.java      |   2 +-
 .../common/statemachine/StateContext.java       |  20 --
 .../CloseContainerCommandHandler.java           | 138 ++++++------
 .../states/endpoint/RegisterEndpointTask.java   |   2 +-
 .../transport/server/XceiverServerGrpc.java     |   5 +
 .../transport/server/XceiverServerSpi.java      |   7 +
 .../server/ratis/XceiverServerRatis.java        |  18 +-
 .../container/keyvalue/KeyValueContainer.java   |  41 +++-
 .../container/keyvalue/KeyValueHandler.java     | 119 +++++++----
 .../ozoneimpl/ContainerController.java          | 124 +++++++++++
 .../container/ozoneimpl/OzoneContainer.java     | 124 +++++------
 .../DownloadAndImportReplicator.java            |  16 +-
 .../OnDemandContainerReplicationSource.java     |  10 +-
 .../commands/CloseContainerCommand.java         |  12 +-
 .../StorageContainerDatanodeProtocol.proto      |   8 +-
 .../common/impl/TestHddsDispatcher.java         |  26 ++-
 .../common/interfaces/TestHandler.java          |  16 +-
 .../TestCloseContainerCommandHandler.java       | 210 +++++++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java |   8 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |   2 +-
 .../container/CloseContainerEventHandler.java   |   4 +-
 .../hadoop/hdds/scm/node/TestNodeManager.java   |   3 +-
 .../ozone/container/common/TestEndPoint.java    |   8 +-
 .../dist/src/main/smoketest/basic/basic.robot   |   2 +-
 .../hadoop/ozone/TestMiniOzoneCluster.java      |  18 +-
 .../rpc/TestCloseContainerHandlingByClient.java |  26 +--
 .../TestCloseContainerByPipeline.java           |  80 ++++---
 .../TestCloseContainerHandler.java              |   3 +-
 .../container/metrics/TestContainerMetrics.java |  22 +-
 .../container/server/TestContainerServer.java   |  26 ++-
 .../genesis/BenchMarkDatanodeDispatcher.java    |  19 +-
 .../freon/TestFreonWithDatanodeFastRestart.java |   3 +-
 38 files changed, 875 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 5eecdcb..ac2d277 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -229,8 +229,8 @@ message ContainerDataProto {
   enum State {
     OPEN = 1;
     CLOSING = 2;
-    CLOSED = 3;
-    QUASI_CLOSED = 4;
+    QUASI_CLOSED =3;
+    CLOSED = 4;
     UNHEALTHY = 5;
     INVALID = 6;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index ad199f0..6f51517 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -237,11 +237,26 @@ public abstract class ContainerData {
    * checks if the container is closed.
    * @return - boolean
    */
-  public synchronized  boolean isClosed() {
+  public synchronized boolean isClosed() {
     return ContainerDataProto.State.CLOSED == state;
   }
 
   /**
+   * checks if the container is quasi closed.
+   * @return - boolean
+   */
+  public synchronized boolean isQuasiClosed() {
+    return ContainerDataProto.State.QUASI_CLOSED == state;
+  }
+
+  /**
+   * Marks this container as quasi closed.
+   */
+  public synchronized void quasiCloseContainer() {
+    setState(ContainerDataProto.State.QUASI_CLOSED);
+  }
+
+  /**
    * Marks this container as closed.
    */
   public synchronized void closeContainer() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index c52d973..29f9b20 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -76,18 +75,14 @@ public class HddsDispatcher implements ContainerDispatcher {
    * XceiverServerHandler.
    */
   public HddsDispatcher(Configuration config, ContainerSet contSet,
-      VolumeSet volumes, StateContext context) {
+      VolumeSet volumes, Map<ContainerType, Handler> handlers,
+      StateContext context, ContainerMetrics metrics) {
     this.conf = config;
     this.containerSet = contSet;
     this.volumeSet = volumes;
     this.context = context;
-    this.handlers = Maps.newHashMap();
-    this.metrics = ContainerMetrics.create(conf);
-    for (ContainerType containerType : ContainerType.values()) {
-      handlers.put(containerType,
-          Handler.getHandlerForContainerType(
-              containerType, conf, containerSet, volumeSet, metrics));
-    }
+    this.handlers = handlers;
+    this.metrics = metrics;
     this.containerCloseThreshold = conf.getFloat(
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -347,6 +342,7 @@ public class HddsDispatcher implements ContainerDispatcher {
     }
   }
 
+  @VisibleForTesting
   public Container getContainer(long containerID) {
     return containerSet.getContainer(containerID);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 65147cc..2bda747 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -82,11 +82,24 @@ public interface Container<CONTAINERDATA extends 
ContainerData> extends RwLock {
   ContainerProtos.ContainerDataProto.State getContainerState();
 
   /**
-   * Closes a open container, if it is already closed or does not exist a
+   * Marks the container for closing. Moves the container to CLOSING state.
+   */
+  void markContainerForClose() throws StorageContainerException;
+
+  /**
+   * Quasi Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
    *
    * @throws StorageContainerException
    */
+  void quasiClose() throws StorageContainerException;
+
+  /**
+   * Closes a open/quasi closed container, if it is already closed or does not
+   * exist a StorageContainerException is thrown.
+   *
+   * @throws StorageContainerException
+   */
   void close() throws StorageContainerException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 53e1c68..19e61c4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -29,8 +29,12 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
@@ -47,26 +51,47 @@ public abstract class Handler {
   protected String scmID;
   protected final ContainerMetrics metrics;
 
-  protected Handler(Configuration config, ContainerSet contSet,
-      VolumeSet volumeSet, ContainerMetrics containerMetrics) {
-    conf = config;
-    containerSet = contSet;
+  private final StateContext context;
+
+  protected Handler(Configuration config, StateContext context,
+      ContainerSet contSet, VolumeSet volumeSet,
+      ContainerMetrics containerMetrics) {
+    this.conf = config;
+    this.context = context;
+    this.containerSet = contSet;
     this.volumeSet = volumeSet;
     this.metrics = containerMetrics;
   }
 
-  public static Handler getHandlerForContainerType(ContainerType containerType,
-      Configuration config, ContainerSet contSet, VolumeSet volumeSet,
-                                                   ContainerMetrics metrics) {
+  public static Handler getHandlerForContainerType(
+      final ContainerType containerType, final Configuration config,
+      final StateContext context, final ContainerSet contSet,
+      final VolumeSet volumeSet, final ContainerMetrics metrics) {
     switch (containerType) {
     case KeyValueContainer:
-      return new KeyValueHandler(config, contSet, volumeSet, metrics);
+      return new KeyValueHandler(config, context, contSet, volumeSet, metrics);
     default:
       throw new IllegalArgumentException("Handler for ContainerType: " +
         containerType + "doesn't exist.");
     }
   }
 
+  /**
+   * This should be called whenever there is state change. It will trigger
+   * an ICR to SCM.
+   *
+   * @param container Container for which ICR has to be sent
+   */
+  protected void sendICR(final Container container)
+      throws StorageContainerException {
+    IncrementalContainerReportProto icr = IncrementalContainerReportProto
+        .newBuilder()
+        .addReport(container.getContainerReport())
+        .build();
+    context.addReport(icr);
+    context.getParent().triggerHeartbeat();
+  }
+
   public abstract ContainerCommandResponseProto handle(
       ContainerCommandRequestProto msg, Container container);
 
@@ -80,6 +105,33 @@ public abstract class Handler {
       TarContainerPacker packer)
       throws IOException;
 
+  /**
+   * Marks the container for closing. Moves the container to CLOSING state.
+   *
+   * @param container container to update
+   * @throws IOException in case of exception
+   */
+  public abstract void markContainerForClose(Container container)
+      throws IOException;
+
+  /**
+   * Moves the Container to QUASI_CLOSED state.
+   *
+   * @param container container to be quasi closed
+   * @throws IOException
+   */
+  public abstract void quasiCloseContainer(Container container)
+      throws IOException;
+
+  /**
+   * Moves the Container to CLOSED state.
+   *
+   * @param container container to be closed
+   * @throws IOException
+   */
+  public abstract void closeContainer(Container container)
+      throws IOException;
+
   public void setScmID(String scmId) {
     this.scmID = scmId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
index ccb9a9a..b92e3b0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
@@ -80,6 +80,7 @@ public class ContainerReportPublisher extends
 
   @Override
   protected ContainerReportsProto getReport() throws IOException {
-    return getContext().getParent().getContainer().getContainerReport();
+    return getContext().getParent().getContainer()
+        .getController().getContainerReport();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 12c33ff..1d87071 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -99,7 +99,7 @@ public class DatanodeStateMachine implements Closeable {
 
     ContainerReplicator replicator =
         new DownloadAndImportReplicator(container.getContainerSet(),
-            container.getDispatcher(),
+            container.getController(),
             new SimpleContainerDownloader(conf), new TarContainerPacker());
 
     supervisor =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index e928824..953f730 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -59,8 +59,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-
 /**
  * Current Context of State Machine.
  */
@@ -116,24 +114,6 @@ public class StateContext {
   }
 
   /**
-   * Get the container server port.
-   * @return The container server port if available, return -1 if otherwise
-   */
-  public int getContainerPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getContainerServerPort();
-  }
-
-  /**
-   * Gets the Ratis Port.
-   * @return int , return -1 if not valid.
-   */
-  public int getRatisPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
-  }
-
-  /**
    * Returns true if we are entering a new state.
    *
    * @return boolean

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 0838be2..9d25812 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -16,21 +16,20 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerDataProto.State;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
@@ -38,17 +37,19 @@ import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
  * Handler for close container command received from SCM.
  */
 public class CloseContainerCommandHandler implements CommandHandler {
-  static final Logger LOG =
+
+  private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
+
   private int invocationCount;
   private long totalTime;
-  private boolean cmdExecuted;
 
   /**
    * Constructs a ContainerReport handler.
@@ -67,79 +68,70 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
-    long containerID = -1;
     try {
-      CloseContainerCommandProto closeContainerProto =
+      LOG.debug("Processing Close Container command.");
+      invocationCount++;
+      final long startTime = Time.monotonicNow();
+      final DatanodeDetails datanodeDetails = context.getParent()
+          .getDatanodeDetails();
+      final CloseContainerCommandProto closeCommand =
           CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
-      containerID = closeContainerProto.getContainerID();
-      // CloseContainer operation is idempotent, if the container is already
-      // closed, then do nothing.
-      // TODO: Non-existent container should be handled properly
-      Container container =
-          ozoneContainer.getContainerSet().getContainer(containerID);
-      if (container == null) {
-        LOG.error("Container {} does not exist in datanode. "
-            + "Container close failed.", containerID);
-        cmdExecuted = false;
-        return;
-      }
-      ContainerData containerData = container.getContainerData();
-      State containerState = container.getContainerData().getState();
-      if (containerState != State.CLOSED) {
-        LOG.debug("Closing container {}.", containerID);
-        // when a closeContainerCommand arrives at a Datanode and if the
-        // container is open, each replica will be moved to closing state 
first.
-        if (containerState == State.OPEN) {
-          containerData.setState(State.CLOSING);
+      final ContainerController controller = ozoneContainer.getController();
+      final long containerId = closeCommand.getContainerID();
+      try {
+        // TODO: Closing of QUASI_CLOSED container.
+
+        final Container container = controller.getContainer(containerId);
+
+        if (container == null) {
+          LOG.error("Container #{} does not exist in datanode. "
+              + "Container close failed.", containerId);
+          return;
         }
 
-        // if the container is already closed, it will be just ignored.
-        // ICR will get triggered to change the replica state in SCM.
-        HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
-        HddsProtos.ReplicationType replicationType =
-            closeContainerProto.getReplicationType();
-
-        ContainerProtos.ContainerCommandRequestProto.Builder request =
-            ContainerProtos.ContainerCommandRequestProto.newBuilder();
-        request.setCmdType(ContainerProtos.Type.CloseContainer);
-        request.setContainerID(containerID);
-        request.setCloseContainer(
-            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-        request.setTraceID(UUID.randomUUID().toString());
-        request.setDatanodeUuid(
-            context.getParent().getDatanodeDetails().getUuidString());
-        // submit the close container request for the XceiverServer to handle
-        ozoneContainer.submitContainerRequest(request.build(), replicationType,
-            pipelineID);
-        // Since the container is closed, we trigger an ICR
-        IncrementalContainerReportProto icr =
-            IncrementalContainerReportProto.newBuilder().addReport(
-                ozoneContainer.getContainerSet().getContainer(containerID)
-                    .getContainerReport()).build();
-        context.addReport(icr);
-        context.getParent().triggerHeartbeat();
-      }
-    } catch (Exception e) {
-      if (e instanceof NotLeaderException) {
-        // If the particular datanode is not the Ratis leader, the close
-        // container command will not be executed by the follower but will be
-        // executed by Ratis stateMachine transactions via leader to follower.
-        // There can also be case where the datanode is in candidate state.
-        // In these situations, NotLeaderException is thrown.
-        LOG.info("Follower cannot close the container {}.", containerID);
-      } else {
-        LOG.error("Can't close container " + containerID, e);
+        // Move the container to CLOSING state
+        controller.markContainerForClose(containerId);
+
+        // If the container is part of open pipeline, close it via write 
channel
+        if (ozoneContainer.getWriteChannel()
+            .isExist(closeCommand.getPipelineID())) {
+          ContainerCommandRequestProto request =
+              getContainerCommandRequestProto(datanodeDetails,
+                  closeCommand.getContainerID());
+          ozoneContainer.getWriteChannel().submitRequest(
+              request, closeCommand.getPipelineID());
+          return;
+        }
+
+        // The container is not part of any open pipeline.
+        // QUASI_CLOSE the container using ContainerController.
+        controller.quasiCloseContainer(containerId);
+      } catch (NotLeaderException e) {
+        LOG.debug("Follower cannot close container #{}.", containerId);
+      } catch (IOException e) {
+        LOG.error("Can't close container #{}", containerId, e);
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
       }
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
+    } catch (InvalidProtocolBufferException ex) {
+      LOG.error("Exception while closing container", ex);
     }
   }
 
+  private ContainerCommandRequestProto getContainerCommandRequestProto(
+      final DatanodeDetails datanodeDetails, final long containerId) {
+    final ContainerCommandRequestProto.Builder command =
+        ContainerCommandRequestProto.newBuilder();
+    command.setCmdType(ContainerProtos.Type.CloseContainer);
+    command.setContainerID(containerId);
+    command.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    command.setTraceID(UUID.randomUUID().toString());
+    command.setDatanodeUuid(datanodeDetails.getUuidString());
+    return command.build();
+  }
+
   /**
    * Returns the command type that this command handler handles.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index acd4af9..9918f9d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -111,7 +111,7 @@ public final class RegisterEndpointTask implements
     try {
 
       ContainerReportsProto containerReport = datanodeContainerManager
-          .getContainerReport();
+          .getController().getContainerReport();
       NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
       PipelineReportsProto pipelineReportsProto =
               datanodeContainerManager.getPipelineReport();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index ab9f42f..992f3cb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -141,6 +141,11 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
   }
 
   @Override
+  public boolean isExist(HddsProtos.PipelineID pipelineId) {
+    return PipelineID.valueOf(id).getProtobuf().equals(pipelineId);
+  }
+
+  @Override
   public List<PipelineReport> getPipelineReport() {
     return Collections.singletonList(
             PipelineReport.newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 8c3fa5c..4e0d343 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -54,6 +54,13 @@ public interface XceiverServerSpi {
       throws IOException;
 
   /**
+   * Returns true if the given pipeline exist.
+   *
+   * @return true if pipeline present, else false
+   */
+  boolean isExist(HddsProtos.PipelineID pipelineId);
+
+  /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7bf4da9..434d330 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -86,7 +86,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * Ozone containers.
  */
 public final class XceiverServerRatis implements XceiverServerSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(XceiverServerRatis.class);
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
   private static long nextCallId() {
@@ -458,6 +459,21 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   }
 
   @Override
+  public boolean isExist(HddsProtos.PipelineID pipelineId) {
+    try {
+      for (RaftGroupId groupId : server.getGroupIds()) {
+        if (PipelineID.valueOf(
+            groupId.getUuid()).getProtobuf().equals(pipelineId)) {
+          return true;
+        }
+      }
+      return false;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
   public List<PipelineReport> getPipelineReport() {
     try {
       Iterable<RaftGroupId> gids = server.getGroupIds();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index b82c12f..f725d22 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -267,29 +269,47 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
+  public void markContainerForClose() throws StorageContainerException {
+    updateContainerData(() ->
+        containerData.setState(ContainerDataProto.State.CLOSING));
+  }
+
+  @Override
+  public void quasiClose() throws StorageContainerException {
+    updateContainerData(containerData::quasiCloseContainer);
+  }
+
+  @Override
   public void close() throws StorageContainerException {
+    updateContainerData(containerData::closeContainer);
+    // It is ok if this operation takes a bit of time.
+    // Close container is not expected to be instantaneous.
+    compactDB();
+  }
 
-    //TODO: writing .container file and compaction can be done
-    // asynchronously, otherwise rpc call for this will take a lot of time to
-    // complete this action
+  private void updateContainerData(Runnable update)
+      throws StorageContainerException {
+    ContainerDataProto.State oldState = null;
     try {
       writeLock();
-
-      containerData.closeContainer();
+      oldState = containerData.getState();
+      update.run();
       File containerFile = getContainerFile();
       // update the new container data to .container File
       updateContainerFile(containerFile);
 
     } catch (StorageContainerException ex) {
-      // Failed to update .container file. Reset the state to CLOSING
-      containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
+      if (oldState != null) {
+        // Failed to update .container file. Reset the state to CLOSING
+        containerData.setState(oldState);
+      }
       throw ex;
     } finally {
       writeUnlock();
     }
+  }
 
-    // It is ok if this operation takes a bit of time.
-    // Close container is not expected to be instantaneous.
+  private void compactDB() throws StorageContainerException {
     try {
       MetadataStore db = BlockUtils.getDB(containerData, config);
       db.compactDB();
@@ -549,6 +569,9 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
     case CLOSING:
       state = ContainerReplicaProto.State.CLOSING;
       break;
+    case QUASI_CLOSED:
+      state = ContainerReplicaProto.State.QUASI_CLOSED;
+      break;
     case CLOSED:
       state = ContainerReplicaProto.State.CLOSED;
       break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index f970c72..4a5ef18 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto;
+    .ContainerDataProto.State;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
@@ -109,9 +110,9 @@ public class KeyValueHandler extends Handler {
   private final long maxContainerSize;
   private final AutoCloseableLock handlerLock;
 
-  public KeyValueHandler(Configuration config, ContainerSet contSet,
-      VolumeSet volSet, ContainerMetrics metrics) {
-    super(config, contSet, volSet, metrics);
+  public KeyValueHandler(Configuration config, StateContext context,
+      ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
+    super(config, context, contSet, volSet, metrics);
     containerType = ContainerType.KeyValueContainer;
     blockManager = new BlockManagerImpl(config);
     chunkManager = new ChunkManagerImpl();
@@ -372,20 +373,10 @@ public class KeyValueHandler extends Handler {
           request.getTraceID());
       return ContainerUtils.malformedRequest(request);
     }
-
-    long containerID = kvContainer.getContainerData().getContainerID();
     try {
-      checkContainerOpen(kvContainer);
-      // TODO : The close command should move the container to either quasi
-      // closed/closed depending upon how the closeContainer gets executed.
-      // If it arrives by Standalone, it will be moved to Quasi Closed or
-      // otherwise moved to Closed state if it gets executed via Ratis.
-      kvContainer.close();
+      markContainerForClose(kvContainer);
+      closeContainer(kvContainer);
     } catch (StorageContainerException ex) {
-      if (ex.getResult() == CLOSED_CONTAINER_IO) {
-        LOG.debug("Container {} is already closed.", containerID);
-        return ContainerUtils.getSuccessResponse(request);
-      }
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ex) {
       return ContainerUtils.logAndReturnError(LOG,
@@ -745,38 +736,39 @@ public class KeyValueHandler extends Handler {
   private void checkContainerOpen(KeyValueContainer kvContainer)
       throws StorageContainerException {
 
-    ContainerDataProto.State containerState = kvContainer.getContainerState();
+    final State containerState = kvContainer.getContainerState();
 
-    /**
+    /*
      * In a closing state, follower will receive transactions from leader.
      * Once the leader is put to closing state, it will reject further requests
      * from clients. Only the transactions which happened before the container
      * in the leader goes to closing state, will arrive here even the container
      * might already be in closing state here.
      */
-    if (containerState == ContainerDataProto.State.OPEN
-        || containerState == ContainerDataProto.State.CLOSING) {
+    if (containerState == State.OPEN || containerState == State.CLOSING) {
       return;
-    } else {
-      String msg = "Requested operation not allowed as ContainerState is " +
-          containerState;
-      ContainerProtos.Result result = null;
-      switch (containerState) {
-      case CLOSED:
-        result = CLOSED_CONTAINER_IO;
-        break;
-      case UNHEALTHY:
-        result = CONTAINER_UNHEALTHY;
-        break;
-      case INVALID:
-        result = INVALID_CONTAINER_STATE;
-        break;
-      default:
-        result = CONTAINER_INTERNAL_ERROR;
-      }
+    }
 
-      throw new StorageContainerException(msg, result);
+    final ContainerProtos.Result result;
+    switch (containerState) {
+    case QUASI_CLOSED:
+      result = CLOSED_CONTAINER_IO;
+      break;
+    case CLOSED:
+      result = CLOSED_CONTAINER_IO;
+      break;
+    case UNHEALTHY:
+      result = CONTAINER_UNHEALTHY;
+      break;
+    case INVALID:
+      result = INVALID_CONTAINER_STATE;
+      break;
+    default:
+      result = CONTAINER_INTERNAL_ERROR;
     }
+    String msg = "Requested operation not allowed as ContainerState is " +
+        containerState;
+    throw new StorageContainerException(msg, result);
   }
 
   public Container importContainer(long containerID, long maxSize,
@@ -796,4 +788,55 @@ public class KeyValueHandler extends Handler {
     return container;
 
   }
+
+  @Override
+  public void markContainerForClose(Container container)
+      throws IOException {
+    State currentState = container.getContainerState();
+    // Move the container to CLOSING state only if it's OPEN
+    if (currentState == State.OPEN) {
+      container.markContainerForClose();
+      sendICR(container);
+    }
+  }
+
+  @Override
+  public void quasiCloseContainer(Container container)
+      throws IOException {
+    final State state = container.getContainerState();
+    // Quasi close call is idempotent.
+    if (state == State.QUASI_CLOSED) {
+      return;
+    }
+    // The container has to be in CLOSING state.
+    if (state != State.CLOSING) {
+      ContainerProtos.Result error = state == State.INVALID ?
+          INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
+      throw new StorageContainerException("Cannot quasi close container #" +
+          container.getContainerData().getContainerID() + " while in " +
+          state + " state.", error);
+    }
+    container.quasiClose();
+    sendICR(container);
+  }
+
+  @Override
+  public void closeContainer(Container container)
+      throws IOException {
+    final State state = container.getContainerState();
+    // Close call is idempotent.
+    if (state == State.CLOSED) {
+      return;
+    }
+    // The container has to be either in CLOSING or in QUASI_CLOSED state.
+    if (state != State.CLOSING && state != State.QUASI_CLOSED) {
+      ContainerProtos.Result error = state == State.INVALID ?
+          INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
+      throw new StorageContainerException("Cannot close container #" +
+          container.getContainerData().getContainerID() + " while in " +
+          state + " state.", error);
+    }
+    container.close();
+    sendICR(container);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
new file mode 100644
index 0000000..41b74e7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Control plane for container management in datanode.
+ */
+public class ContainerController {
+
+  private final ContainerSet containerSet;
+  private final Map<ContainerType, Handler> handlers;
+
+  public ContainerController(final ContainerSet containerSet,
+      final Map<ContainerType, Handler> handlers) {
+    this.containerSet = containerSet;
+    this.handlers = handlers;
+  }
+
+  /**
+   * Returns the Container given a container id.
+   *
+   * @param containerId ID of the container
+   * @return Container
+   */
+  public Container getContainer(final long containerId) {
+    return containerSet.getContainer(containerId);
+  }
+
+  /**
+   * Marks the container for closing. Moves the container to CLOSING state.
+   *
+   * @param containerId Id of the container to update
+   * @throws IOException in case of exception
+   */
+  public void markContainerForClose(final long containerId)
+      throws IOException {
+    Container container = containerSet.getContainer(containerId);
+
+    if (container.getContainerState() == State.OPEN) {
+      getHandler(container).markContainerForClose(container);
+    }
+  }
+
+  /**
+   * Returns the container report.
+   *
+   * @return ContainerReportsProto
+   * @throws IOException in case of exception
+   */
+  public ContainerReportsProto getContainerReport()
+      throws IOException {
+    return containerSet.getContainerReport();
+  }
+
+  /**
+   * Quasi closes a container given its id.
+   *
+   * @param containerId Id of the container to quasi close
+   * @throws IOException in case of exception
+   */
+  public void quasiCloseContainer(final long containerId) throws IOException {
+    final Container container = containerSet.getContainer(containerId);
+    getHandler(container).quasiCloseContainer(container);
+  }
+
+  /**
+   * Closes a container given its id.
+   *
+   * @param containerId Id of the container to close
+   * @throws IOException in case of exception
+   */
+  public void closeContainer(final long containerId) throws IOException {
+    final Container container = containerSet.getContainer(containerId);
+    getHandler(container).closeContainer(container);
+  }
+
+  public Container importContainer(final ContainerType type,
+      final long containerId, final long maxSize,
+      final FileInputStream rawContainerStream, final TarContainerPacker 
packer)
+      throws IOException {
+    return handlers.get(type).importContainer(
+        containerId, maxSize, rawContainerStream, packer);
+  }
+
+  /**
+   * Given a container, returns its handler instance.
+   *
+   * @param container Container
+   * @return handler of the container
+   */
+  private Handler getHandler(final Container container) {
+    return handlers.get(container.getContainerType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 9fac3cb..a89b50a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,18 +19,20 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -47,27 +49,26 @@ import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-
 /**
  * Ozone main class sets up the network servers and initializes the container
  * layer.
  */
 public class OzoneContainer {
 
-  public static final Logger LOG = LoggerFactory.getLogger(
+  private static final Logger LOG = LoggerFactory.getLogger(
       OzoneContainer.class);
 
   private final HddsDispatcher hddsDispatcher;
-  private final DatanodeDetails dnDetails;
+  private final Map<ContainerType, Handler> handlers;
   private final OzoneConfiguration config;
   private final VolumeSet volumeSet;
   private final ContainerSet containerSet;
-  private final Map<ReplicationType, XceiverServerSpi> servers;
+  private final XceiverServerSpi writeChannel;
+  private final XceiverServerSpi readChannel;
+  private final ContainerController controller;
 
   /**
    * Construct OzoneContainer object.
@@ -78,31 +79,42 @@ public class OzoneContainer {
    */
   public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
       conf, StateContext context) throws IOException {
-    this.dnDetails = datanodeDetails;
     this.config = conf;
     this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
     this.containerSet = new ContainerSet();
     buildContainerSet();
-    hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
-        context);
-    servers = new HashMap<>();
-    servers.put(ReplicationType.STAND_ALONE,
-        new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
-            createReplicationService()));
-    servers.put(ReplicationType.RATIS, XceiverServerRatis
-        .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
-            context));
+    final ContainerMetrics metrics = ContainerMetrics.create(conf);
+    this.handlers = Maps.newHashMap();
+    for (ContainerType containerType : ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, context, containerSet, volumeSet, metrics));
+    }
+    this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
+        handlers, context, metrics);
+
+    /*
+     * ContainerController is the control plane
+     * XceiverServerRatis is the write channel
+     * XceiverServerGrpc is the read channel
+     */
+    this.controller = new ContainerController(containerSet, handlers);
+    this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
+        datanodeDetails, config, hddsDispatcher, context);
+    this.readChannel = new XceiverServerGrpc(
+        datanodeDetails, config, hddsDispatcher, createReplicationService());
+
   }
 
   private GrpcReplicationService createReplicationService() {
     return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(containerSet));
+        new OnDemandContainerReplicationSource(controller));
   }
 
   /**
    * Build's container map.
    */
-  public void buildContainerSet() {
+  private void buildContainerSet() {
     Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
         .iterator();
     ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
@@ -111,7 +123,6 @@ public class OzoneContainer {
     // And also handle disk failure tolerance need to be added
     while (volumeSetIterator.hasNext()) {
       HddsVolume volume = volumeSetIterator.next();
-      File hddsVolumeRootDir = volume.getHddsRootDir();
       Thread thread = new Thread(new ContainerReader(volumeSet, volume,
           containerSet, config));
       thread.start();
@@ -135,9 +146,8 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     LOG.info("Attempting to start container services.");
-    for (XceiverServerSpi serverinstance : servers.values()) {
-      serverinstance.start();
-    }
+    writeChannel.start();
+    readChannel.start();
     hddsDispatcher.init();
   }
 
@@ -147,9 +157,8 @@ public class OzoneContainer {
   public void stop() {
     //TODO: at end of container IO integration work.
     LOG.info("Attempting to stop container services.");
-    for(XceiverServerSpi serverinstance: servers.values()) {
-      serverinstance.stop();
-    }
+    writeChannel.stop();
+    readChannel.stop();
     hddsDispatcher.shutdown();
   }
 
@@ -163,58 +172,24 @@ public class OzoneContainer {
    * @return - container report.
    * @throws IOException
    */
-  public StorageContainerDatanodeProtocolProtos.ContainerReportsProto
-      getContainerReport() throws IOException {
-    return this.containerSet.getContainerReport();
-  }
 
   public PipelineReportsProto getPipelineReport() {
     PipelineReportsProto.Builder pipelineReportsProto =
-            PipelineReportsProto.newBuilder();
-    for (XceiverServerSpi serverInstance : servers.values()) {
-      pipelineReportsProto
-              .addAllPipelineReport(serverInstance.getPipelineReport());
-    }
+        PipelineReportsProto.newBuilder();
+    
pipelineReportsProto.addAllPipelineReport(writeChannel.getPipelineReport());
     return pipelineReportsProto.build();
   }
 
-  /**
-   * Submit ContainerRequest.
-   * @param request
-   * @param replicationType
-   * @param pipelineID
-   */
-  public void submitContainerRequest(
-      ContainerProtos.ContainerCommandRequestProto request,
-      ReplicationType replicationType,
-      PipelineID pipelineID) throws IOException {
-    LOG.info("submitting {} request over {} server for container {}",
-        request.getCmdType(), replicationType, request.getContainerID());
-    Preconditions.checkState(servers.containsKey(replicationType));
-    servers.get(replicationType).submitRequest(request, pipelineID);
-  }
-
-  private int getPortByType(ReplicationType replicationType) {
-    return servers.containsKey(replicationType) ?
-        servers.get(replicationType).getIPCPort() : INVALID_PORT;
+  public XceiverServerSpi getWriteChannel() {
+    return writeChannel;
   }
 
-  /**
-   * Returns the container servers IPC port.
-   *
-   * @return Container servers IPC port.
-   */
-  public int getContainerServerPort() {
-    return getPortByType(ReplicationType.STAND_ALONE);
+  public XceiverServerSpi getReadChannel() {
+    return readChannel;
   }
 
-  /**
-   * Returns the Ratis container Server IPC port.
-   *
-   * @return Ratis port.
-   */
-  public int getRatisContainerServerPort() {
-    return getPortByType(ReplicationType.RATIS);
+  public ContainerController getController() {
+    return controller;
   }
 
   /**
@@ -230,11 +205,6 @@ public class OzoneContainer {
     return this.hddsDispatcher;
   }
 
-  @VisibleForTesting
-  public XceiverServerSpi getServer(ReplicationType replicationType) {
-    return servers.get(replicationType);
-  }
-
   public VolumeSet getVolumeSet() {
     return volumeSet;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 5ef5841..e8f0d17 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -28,9 +28,8 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
 import org.slf4j.Logger;
@@ -49,7 +48,7 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
 
   private final ContainerSet containerSet;
 
-  private final ContainerDispatcher containerDispatcher;
+  private final ContainerController controller;
 
   private final ContainerDownloader downloader;
 
@@ -57,11 +56,11 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
 
   public DownloadAndImportReplicator(
       ContainerSet containerSet,
-      ContainerDispatcher containerDispatcher,
+      ContainerController controller,
       ContainerDownloader downloader,
       TarContainerPacker packer) {
     this.containerSet = containerSet;
-    this.containerDispatcher = containerDispatcher;
+    this.controller = controller;
     this.downloader = downloader;
     this.packer = packer;
   }
@@ -80,10 +79,9 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
       try (FileInputStream tempContainerTarStream = new FileInputStream(
           tarFilePath.toFile())) {
 
-        Handler handler = containerDispatcher.getHandler(
-            originalContainerData.getContainerType());
-
-        Container container = handler.importContainer(containerID,
+        Container container = controller.importContainer(
+            originalContainerData.getContainerType(),
+            containerID,
             originalContainerData.getMaxSize(),
             tempContainerTarStream,
             packer);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index d557b54..28b8713 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.ozone.container.replication;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +39,13 @@ public class OnDemandContainerReplicationSource
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerReplicationSource.class);
 
-  private ContainerSet containerSet;
+  private ContainerController controller;
 
   private ContainerPacker packer = new TarContainerPacker();
 
   public OnDemandContainerReplicationSource(
-      ContainerSet containerSet) {
-    this.containerSet = containerSet;
+      ContainerController controller) {
+    this.controller = controller;
   }
 
   @Override
@@ -57,7 +57,7 @@ public class OnDemandContainerReplicationSource
   public void copyData(long containerId, OutputStream destination)
       throws IOException {
 
-    Container container = containerSet.getContainer(containerId);
+    Container container = controller.getContainer(containerId);
 
     Preconditions
         .checkNotNull(container, "Container is not found " + containerId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index 7849bcd..4fe6ae4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -30,14 +29,11 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 public class CloseContainerCommand
     extends SCMCommand<CloseContainerCommandProto> {
 
-  private HddsProtos.ReplicationType replicationType;
-  private PipelineID pipelineID;
+  private final PipelineID pipelineID;
 
-  public CloseContainerCommand(long containerID,
-      HddsProtos.ReplicationType replicationType,
-      PipelineID pipelineID) {
+  public CloseContainerCommand(final long containerID,
+      final PipelineID pipelineID) {
     super(containerID);
-    this.replicationType = replicationType;
     this.pipelineID = pipelineID;
   }
 
@@ -65,7 +61,6 @@ public class CloseContainerCommand
     return CloseContainerCommandProto.newBuilder()
         .setContainerID(getId())
         .setCmdId(getId())
-        .setReplicationType(replicationType)
         .setPipelineID(pipelineID.getProtobuf())
         .build();
   }
@@ -74,7 +69,6 @@ public class CloseContainerCommand
       CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
     return new CloseContainerCommand(closeContainerProto.getCmdId(),
-        closeContainerProto.getReplicationType(),
         PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 33ea307..9fdef7d 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -140,8 +140,8 @@ message ContainerReplicaProto {
   enum State {
     OPEN = 1;
     CLOSING = 2;
-    CLOSED = 3;
-    QUASI_CLOSED = 4;
+    QUASI_CLOSED = 3;
+    CLOSED = 4;
     UNHEALTHY = 5;
     INVALID = 6;
   }
@@ -289,9 +289,9 @@ This command asks the datanode to close a specific 
container.
 */
 message CloseContainerCommandProto {
   required int64 containerID = 1;
-  required hadoop.hdds.ReplicationType replicationType = 2;
+  required PipelineID pipelineID = 2;
+  // cmdId will be removed
   required int64 cmdId = 3;
-  required PipelineID pipelineID = 4;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 35cda00..085e6f9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
@@ -26,6 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto
+    .ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto
     .ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -33,7 +36,9 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .WriteChunkRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -47,6 +52,7 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -77,8 +83,15 @@ public class TestHddsDispatcher {
       container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
           scmId.toString());
       containerSet.addContainer(container);
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerType containerType : ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
       HddsDispatcher hddsDispatcher = new HddsDispatcher(
-          conf, containerSet, volumeSet, context);
+          conf, containerSet, volumeSet, handlers, context, metrics);
       hddsDispatcher.setScmId(scmId.toString());
       ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch(
           getWriteChunkRequest(dd.getUuidString(), 1L, 1L));
@@ -113,8 +126,15 @@ public class TestHddsDispatcher {
       ContainerSet containerSet = new ContainerSet();
       VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
       StateContext context = Mockito.mock(StateContext.class);
-      HddsDispatcher hddsDispatcher =
-          new HddsDispatcher(conf, containerSet, volumeSet, context);
+      ContainerMetrics metrics = ContainerMetrics.create(conf);
+      Map<ContainerType, Handler> handlers = Maps.newHashMap();
+      for (ContainerType containerType : ContainerType.values()) {
+        handlers.put(containerType,
+            Handler.getHandlerForContainerType(
+                containerType, conf, null, containerSet, volumeSet, metrics));
+      }
+      HddsDispatcher hddsDispatcher = new HddsDispatcher(
+          conf, containerSet, volumeSet, handlers, context, metrics);
       hddsDispatcher.setScmId(scmId.toString());
       ContainerCommandRequestProto writeChunkRequest =
           getWriteChunkRequest(dd.getUuidString(), 1L, 1L);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
index b658295..e763289 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -32,6 +34,8 @@ import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
+import java.util.Map;
+
 /**
  * Tests Handler interface.
  */
@@ -50,8 +54,16 @@ public class TestHandler {
     this.conf = new Configuration();
     this.containerSet = Mockito.mock(ContainerSet.class);
     this.volumeSet = Mockito.mock(VolumeSet.class);
-
-    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null);
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+    for (ContainerProtos.ContainerType containerType :
+        ContainerProtos.ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, null, containerSet, volumeSet, metrics));
+    }
+    this.dispatcher = new HddsDispatcher(
+        conf, containerSet, volumeSet, handlers, null, metrics);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
new file mode 100644
index 0000000..0cf95b7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Test cases to verify CloseContainerCommandHandler in datanode.
+ */
+public class TestCloseContainerCommandHandler {
+
+  private static final StateContext CONTEXT = Mockito.mock(StateContext.class);
+  private static File testDir;
+
+
+  private OzoneContainer getOzoneContainer(final OzoneConfiguration conf,
+      final DatanodeDetails datanodeDetails) throws IOException {
+    testDir = GenericTestUtils.getTestDir(
+        TestCloseContainerCommandHandler.class.getName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testDir.getPath());
+
+    return new OzoneContainer(datanodeDetails, conf, CONTEXT);
+  }
+
+
+  @Test
+  public void testCloseContainerViaRatis()
+      throws IOException, InterruptedException {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+    final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
+    container.getDispatcher().setScmId(UUID.randomUUID().toString());
+    container.start();
+    // Give some time for ratis for leader election.
+    final PipelineID pipelineID = PipelineID.randomId();
+    final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
+    final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
+    final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
+        Collections.singleton(datanodeDetails));
+    final RaftClient client = RatisHelper.newRaftClient(
+        SupportedRpcType.GRPC, peer, retryPolicy);
+    System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
+    Thread.sleep(2000);
+    final ContainerID containerId = ContainerID.valueof(1);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setContainerID(containerId.getId());
+    request.setCreateContainer(
+        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(datanodeDetails.getUuidString());
+    container.getWriteChannel().submitRequest(
+        request.build(), pipelineID.getProtobuf());
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    // We have created a container via ratis. Now close the container on ratis.
+    final CloseContainerCommandHandler closeHandler =
+        new CloseContainerCommandHandler();
+    final CloseContainerCommand command = new CloseContainerCommand(
+        containerId.getId(), pipelineID);
+    final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
+        DatanodeStateMachine.class);
+
+    Mockito.when(datanodeStateMachine.getDatanodeDetails())
+        .thenReturn(datanodeDetails);
+    Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
+
+    closeHandler.handle(command, container, CONTEXT, null);
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
+    container.stop();
+  }
+
+  @Test
+  public void testCloseContainerViaStandalone()
+      throws IOException, InterruptedException {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+    final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
+    container.getDispatcher().setScmId(UUID.randomUUID().toString());
+    container.start();
+    // Give some time for ratis for leader election.
+    final PipelineID pipelineID = PipelineID.randomId();
+    final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
+    final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
+    final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
+        Collections.singleton(datanodeDetails));
+    final RaftClient client = RatisHelper.newRaftClient(
+        SupportedRpcType.GRPC, peer, retryPolicy);
+    System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
+    Thread.sleep(2000);
+    final ContainerID containerId = ContainerID.valueof(2);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setContainerID(containerId.getId());
+    request.setCreateContainer(
+        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+    request.setTraceID(UUID.randomUUID().toString());
+    request.setDatanodeUuid(datanodeDetails.getUuidString());
+    container.getWriteChannel().submitRequest(
+        request.build(), pipelineID.getProtobuf());
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    // We have created a container via ratis. Now quasi close it
+    final CloseContainerCommandHandler closeHandler =
+        new CloseContainerCommandHandler();
+    // Specify a pipeline which doesn't exist in the datanode.
+    final CloseContainerCommand command = new CloseContainerCommand(
+        containerId.getId(), PipelineID.randomId());
+    final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
+        DatanodeStateMachine.class);
+
+    Mockito.when(datanodeStateMachine.getDatanodeDetails())
+        .thenReturn(datanodeDetails);
+    Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
+
+    closeHandler.handle(command, container, CONTEXT, null);
+
+    Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
+        container.getContainerSet().getContainer(
+            containerId.getId()).getContainerState());
+
+    Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
+    container.stop();
+  }
+
+  /**
+   * Creates a random DatanodeDetails.
+   * @return DatanodeDetails
+   */
+  private static DatanodeDetails randomDatanodeDetails() {
+    String ipAddress = "127.0.0.1";
+    DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.STANDALONE, 0);
+    DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS, 0);
+    DatanodeDetails.Port restPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(UUID.randomUUID().toString())
+        .setHostName("localhost")
+        .setIpAddress(ipAddress)
+        .addPort(containerPort)
+        .addPort(ratisPort)
+        .addPort(restPort);
+    return builder.build();
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    FileUtils.deleteDirectory(testDir);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 7fc065f..265fb18 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -54,6 +54,7 @@ import static org.mockito.Mockito.times;
 
 
 import java.io.File;
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -224,7 +225,7 @@ public class TestKeyValueHandler {
       interval[0] = 2;
       ContainerMetrics metrics = new ContainerMetrics(interval);
       VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf);
-      KeyValueHandler keyValueHandler = new KeyValueHandler(conf, cset,
+      KeyValueHandler keyValueHandler = new KeyValueHandler(conf, null, cset,
           volumeSet, metrics);
       assertEquals("org.apache.hadoop.ozone.container.common" +
           ".volume.RoundRobinVolumeChoosingPolicy",
@@ -235,7 +236,7 @@ public class TestKeyValueHandler {
       conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
           "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
       try {
-        new KeyValueHandler(conf, cset, volumeSet, metrics);
+        new KeyValueHandler(conf, null, cset, volumeSet, metrics);
       } catch (RuntimeException ex) {
         GenericTestUtils.assertExceptionContains("class org.apache.hadoop" +
             ".ozone.container.common.impl.HddsDispatcher not org.apache" +
@@ -261,7 +262,7 @@ public class TestKeyValueHandler {
 
 
   @Test
-  public void testCloseInvalidContainer() {
+  public void testCloseInvalidContainer() throws IOException {
     long containerID = 1234L;
     Configuration conf = new Configuration();
     KeyValueContainerData kvData = new KeyValueContainerData(containerID,
@@ -282,6 +283,7 @@ public class TestKeyValueHandler {
 
     Mockito.when(handler.handleCloseContainer(any(), any()))
         .thenCallRealMethod();
+    doCallRealMethod().when(handler).closeContainer(any());
     // Closing invalid container should return error response.
     ContainerProtos.ContainerCommandResponseProto response =
         handler.handleCloseContainer(closeContainerRequest, container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 6f96f4b..34030e8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -172,7 +172,7 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
           LOG.warn("Unable to allocate container.");
         }
       } catch (IOException ex) {
-        LOG.warn("Unable to allocate container: {}", ex);
+        LOG.warn("Unable to allocate container.", ex);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 719d763..fd73711 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -74,8 +74,8 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
       if (container.getState() == LifeCycleState.CLOSING) {
 
         final CloseContainerCommand closeContainerCommand =
-            new CloseContainerCommand(containerID.getId(),
-                container.getReplicationType(), container.getPipelineID());
+            new CloseContainerCommand(
+                containerID.getId(), container.getPipelineID());
 
         getNodes(container).forEach(node -> publisher.fireEvent(
             DATANODE_COMMAND,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to