HDDS-576. Move ContainerWithPipeline creation to RPC endpoint.
Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18fe65d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18fe65d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18fe65d7

Branch: refs/heads/HDFS-13891
Commit: 18fe65d7560b0bd61e3cf3ffbbaf98e87d82120f
Parents: 42f3a70
Author: Nanda kumar <na...@apache.org>
Authored: Mon Nov 12 23:32:31 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Mon Nov 12 23:33:11 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/container/ContainerID.java  |   2 +-
 hadoop-hdds/common/src/main/proto/hdds.proto    |  24 ++--
 .../CloseContainerCommandHandler.java           |   2 +-
 .../protocol/commands/CommandForDatanode.java   |   1 +
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |  77 ++++++------
 .../block/DatanodeDeletedBlockTransactions.java |  40 +++----
 .../hdds/scm/block/DeletedBlockLogImpl.java     |  24 ++--
 .../container/CloseContainerEventHandler.java   | 116 +++++++++----------
 .../hdds/scm/container/ContainerManager.java    |  20 +---
 .../scm/container/ContainerStateManager.java    |   4 +-
 .../hdds/scm/container/SCMContainerManager.java |  63 +---------
 .../scm/server/SCMClientProtocolServer.java     |  32 ++++-
 .../scm/server/StorageContainerManager.java     |   6 +-
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java |   2 +-
 .../hdds/scm/block/TestDeletedBlockLog.java     |  55 +++++----
 .../TestCloseContainerEventHandler.java         |  35 +++---
 .../container/TestContainerReportHandler.java   |   6 +-
 .../scm/container/TestSCMContainerManager.java  |  65 +++++------
 .../hdds/scm/node/TestContainerPlacement.java   |   7 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |   6 +-
 .../hdds/scm/pipeline/TestNodeFailure.java      |  37 +++---
 .../hdds/scm/pipeline/TestPipelineClose.java    |  10 +-
 .../hdds/scm/pipeline/TestSCMRestart.java       |  19 +--
 .../org/apache/hadoop/ozone/OzoneTestUtils.java |   5 +-
 .../ozone/client/rest/TestOzoneRestClient.java  |   7 +-
 .../rpc/TestCloseContainerHandlingByClient.java |  31 +++--
 .../ozone/client/rpc/TestOzoneRpcClient.java    |  21 ++--
 .../TestCloseContainerByPipeline.java           |  19 +--
 .../TestCloseContainerHandler.java              |  19 +--
 .../freon/TestFreonWithDatanodeFastRestart.java |   8 ++
 .../freon/TestFreonWithDatanodeRestart.java     |   5 +
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |   2 +-
 33 files changed, 381 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
index e7ac350..bb44da4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
@@ -102,6 +102,6 @@ public final class ContainerID implements 
Comparable<ContainerID> {
 
   @Override
   public String toString() {
-    return "id=" + id;
+    return "#" + id;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
index a0c6f16..c37683a 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -111,24 +111,18 @@ message NodePool {
  */
 
 enum LifeCycleState {
-    ALLOCATED = 1;
-    CREATING = 2; // Used for container allocated/created by different client.
-    OPEN =3; // Mostly an update to SCM via HB or client call.
-    CLOSING = 4;
-    CLOSED = 5; // !!State after this has not been used yet.
-    DELETING = 6;
-    DELETED = 7; // object is deleted.
+    OPEN = 1;
+    CLOSING = 2;
+    CLOSED = 3;
+    DELETING = 4;
+    DELETED = 5; // object is deleted.
 }
 
 enum LifeCycleEvent {
-    CREATE = 1; // A request to client to create this object
-    CREATED = 2;
-    FINALIZE = 3;
-    CLOSE = 4; // !!Event after this has not been used yet.
-    UPDATE = 5;
-    TIMEOUT = 6; // creation has timed out from SCM's View.
-    DELETE = 7;
-    CLEANUP = 8;
+    FINALIZE = 1;
+    CLOSE = 2; // !!Event after this has not been used yet.
+    DELETE = 3;
+    CLEANUP = 4;
 }
 
 message ContainerInfoProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 22488d9..a7d855b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -84,7 +84,7 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
         cmdExecuted = false;
         return;
       }
-      if (container.getContainerData().isClosed()) {
+      if (!container.getContainerData().isClosed()) {
         LOG.debug("Closing container {}.", containerID);
         HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
         HddsProtos.ReplicationType replicationType =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
index 69337fb..66bf623 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java
@@ -32,6 +32,7 @@ public class CommandForDatanode<T extends GeneratedMessage> 
implements
 
   private final SCMCommand<T> command;
 
+  // TODO: Command for datanode should take DatanodeDetails as parameter.
   public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) {
     this.datanodeId = datanodeId;
     this.command = command;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index abbe9f1..6f96f4b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -26,13 +26,15 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -70,6 +72,7 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
   // Currently only user of the block service is Ozone, CBlock manages blocks
   // by itself and does not rely on the Block service offered by SCM.
 
+  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
 
   private final long containerSize;
@@ -87,14 +90,16 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
    *
    * @param conf - configuration.
    * @param nodeManager - node manager.
+   * @param pipelineManager - pipeline manager.
    * @param containerManager - container manager.
    * @param eventPublisher - event publisher.
    * @throws IOException
    */
   public BlockManagerImpl(final Configuration conf,
-      final NodeManager nodeManager, final ContainerManager containerManager,
-      EventPublisher eventPublisher)
+      final NodeManager nodeManager, final PipelineManager pipelineManager,
+      final ContainerManager containerManager, EventPublisher eventPublisher)
       throws IOException {
+    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
 
     this.containerSize = (long)conf.getStorageSize(
@@ -155,16 +160,15 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
    * @throws IOException
    */
   private synchronized void preAllocateContainers(int count,
-      ReplicationType type, ReplicationFactor factor, String owner)
-      throws IOException {
+      ReplicationType type, ReplicationFactor factor, String owner) {
     for (int i = 0; i < count; i++) {
-      ContainerWithPipeline containerWithPipeline;
+      ContainerInfo containerInfo;
       try {
         // TODO: Fix this later when Ratis is made the Default.
-        containerWithPipeline = containerManager.allocateContainer(
+        containerInfo = containerManager.allocateContainer(
             type, factor, owner);
 
-        if (containerWithPipeline == null) {
+        if (containerInfo == null) {
           LOG.warn("Unable to allocate container.");
         }
       } catch (IOException ex) {
@@ -206,11 +210,11 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
              use different kind of policies.
     */
 
-    ContainerWithPipeline containerWithPipeline;
+    ContainerInfo containerInfo;
 
     // look for OPEN containers that match the criteria.
-    containerWithPipeline = containerManager
-        .getMatchingContainerWithPipeline(size, owner, type, factor,
+    containerInfo = containerManager
+        .getMatchingContainer(size, owner, type, factor,
             HddsProtos.LifeCycleState.OPEN);
 
     // We did not find OPEN Containers. This generally means
@@ -221,27 +225,27 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
     // Even though we have already checked the containers in OPEN
     // state, we have to check again as we only hold a read lock.
     // Some other thread might have pre-allocated container in meantime.
-    if (containerWithPipeline == null) {
+    if (containerInfo == null) {
       synchronized (this) {
         if (!containerManager.getContainers(HddsProtos.LifeCycleState.OPEN)
             .isEmpty()) {
-          containerWithPipeline = containerManager
-              .getMatchingContainerWithPipeline(size, owner, type, factor,
+          containerInfo = containerManager
+              .getMatchingContainer(size, owner, type, factor,
                   HddsProtos.LifeCycleState.OPEN);
         }
 
-        if (containerWithPipeline == null) {
+        if (containerInfo == null) {
           preAllocateContainers(containerProvisionBatchSize, type, factor,
               owner);
-          containerWithPipeline = containerManager
-              .getMatchingContainerWithPipeline(size, owner, type, factor,
+          containerInfo = containerManager
+              .getMatchingContainer(size, owner, type, factor,
                   HddsProtos.LifeCycleState.OPEN);
         }
       }
     }
 
-    if (containerWithPipeline != null) {
-      return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
+    if (containerInfo != null) {
+      return newBlock(containerInfo);
     }
 
     // we have tried all strategies we know and but somehow we are not able
@@ -255,29 +259,26 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
   /**
    * newBlock - returns a new block assigned to a container.
    *
-   * @param containerWithPipeline - Container Info.
-   * @param state - Current state of the container.
+   * @param containerInfo - Container Info.
    * @return AllocatedBlock
    */
-  private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
-      HddsProtos.LifeCycleState state) throws IOException {
-    ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
-    if (containerWithPipeline.getPipeline().getNodes().size() == 0) {
-      LOG.error("Pipeline Machine count is zero.");
+  private AllocatedBlock newBlock(ContainerInfo containerInfo) {
+    try {
+      final Pipeline pipeline = pipelineManager
+          .getPipeline(containerInfo.getPipelineID());
+      // TODO : Revisit this local ID allocation when HA is added.
+      long localID = UniqueId.next();
+      long containerID = containerInfo.getContainerID();
+      AllocatedBlock.Builder abb =  new AllocatedBlock.Builder()
+          .setContainerBlockID(new ContainerBlockID(containerID, localID))
+          .setPipeline(pipeline);
+      LOG.trace("New block allocated : {} Container ID: {}", localID,
+          containerID);
+      return abb.build();
+    } catch (PipelineNotFoundException ex) {
+      LOG.error("Pipeline Machine count is zero.", ex);
       return null;
     }
-
-    // TODO : Revisit this local ID allocation when HA is added.
-    long localID = UniqueId.next();
-    long containerID = containerInfo.getContainerID();
-
-    AllocatedBlock.Builder abb =
-        new AllocatedBlock.Builder()
-            .setContainerBlockID(new ContainerBlockID(containerID, localID))
-            .setPipeline(containerWithPipeline.getPipeline());
-    LOG.trace("New block allocated : {} Container ID: {}", localID,
-        containerID);
-    return abb.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 70e9b5d..ce65a70 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -18,8 +18,8 @@ package org.apache.hadoop.hdds.scm.block;
 
 import com.google.common.collect.ArrayListMultimap;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
@@ -30,8 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
 /**
  * A wrapper class to hold info about datanode and all deleted block
@@ -58,31 +57,28 @@ public class DatanodeDeletedBlockTransactions {
 
   public boolean addTransaction(DeletedBlocksTransaction tx,
       Set<UUID> dnsWithTransactionCommitted) {
-    Pipeline pipeline = null;
     try {
-      ContainerWithPipeline containerWithPipeline =
-          containerManager.getContainerWithPipeline(
-              ContainerID.valueof(tx.getContainerID()));
-      if (containerWithPipeline.getContainerInfo().isOpen()
-          || containerWithPipeline.getPipeline().isEmpty()) {
-        return false;
+      boolean success = false;
+      final ContainerID id = ContainerID.valueof(tx.getContainerID());
+      final ContainerInfo container = containerManager.getContainer(id);
+      final Set<ContainerReplica> replicas = containerManager
+          .getContainerReplicas(id);
+      if (!container.isOpen()) {
+        for (ContainerReplica replica : replicas) {
+          UUID dnID = replica.getDatanodeDetails().getUuid();
+          if (dnsWithTransactionCommitted == null ||
+              !dnsWithTransactionCommitted.contains(dnID)) {
+            // Transaction need not be sent to dns which have
+            // already committed it
+            success = addTransactionToDN(dnID, tx);
+          }
+        }
       }
-      pipeline = containerWithPipeline.getPipeline();
+      return success;
     } catch (IOException e) {
       SCMBlockDeletingService.LOG.warn("Got container info error.", e);
       return false;
     }
-
-    boolean success = false;
-    for (DatanodeDetails dd : pipeline.getNodes()) {
-      UUID dnID = dd.getUuid();
-      if (dnsWithTransactionCommitted == null ||
-          !dnsWithTransactionCommitted.contains(dnID)) {
-        // Transaction need not be sent to dns which have already committed it
-        success = addTransactionToDN(dnID, tx);
-      }
-    }
-    return success;
   }
 
   private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index a5ee130..766d428 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.command
     .CommandStatusReportHandler.DeleteBlockStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -49,7 +50,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,7 +101,7 @@ public class DeletedBlockLogImpl
   private Map<Long, Set<UUID>> transactionToDNsCommitMap;
 
   public DeletedBlockLogImpl(Configuration conf,
-     ContainerManager containerManager) throws IOException {
+      ContainerManager containerManager) throws IOException {
     maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
         OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
 
@@ -249,7 +249,8 @@ public class DeletedBlockLogImpl
           long txID = transactionResult.getTxID();
           // set of dns which have successfully committed transaction txId.
           dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
-          Long containerId = transactionResult.getContainerID();
+          final ContainerID containerId = ContainerID.valueof(
+              transactionResult.getContainerID());
           if (dnsWithCommittedTxn == null) {
             LOG.warn("Transaction txId={} commit by dnId={} for containerID={} 
"
                     + "failed. Corresponding entry not found.", txID, dnID,
@@ -258,16 +259,17 @@ public class DeletedBlockLogImpl
           }
 
           dnsWithCommittedTxn.add(dnID);
-          Pipeline pipeline =
-              containerManager.getContainerWithPipeline(
-                  ContainerID.valueof(containerId)).getPipeline();
-          Collection<DatanodeDetails> containerDnsDetails = 
pipeline.getNodes();
+          final ContainerInfo container = containerManager
+              .getContainer(containerId);
+          final Set<ContainerReplica> replicas =
+              containerManager.getContainerReplicas(containerId);
           // The delete entry can be safely removed from the log if all the
           // corresponding nodes commit the txn. It is required to check that
           // the nodes returned in the pipeline match the replication factor.
-          if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size())
-              >= pipeline.getFactor().getNumber()) {
-            List<UUID> containerDns = containerDnsDetails.stream()
+          if (min(replicas.size(), dnsWithCommittedTxn.size())
+              >= container.getReplicationFactor().getNumber()) {
+            List<UUID> containerDns = replicas.stream()
+                .map(ContainerReplica::getDatanodeDetails)
                 .map(DatanodeDetails::getUuid)
                 .collect(Collectors.toList());
             if (dnsWithCommittedTxn.containsAll(containerDns)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 9b41455..719d763 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -17,9 +17,14 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -42,78 +47,67 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
   public static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerEventHandler.class);
 
-
+  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
 
-  public CloseContainerEventHandler(ContainerManager containerManager) {
+  public CloseContainerEventHandler(final PipelineManager pipelineManager,
+      final ContainerManager containerManager) {
+    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
   }
 
   @Override
   public void onMessage(ContainerID containerID, EventPublisher publisher) {
-
-    LOG.info("Close container Event triggered for container : {}",
-        containerID.getId());
-    ContainerWithPipeline containerWithPipeline;
-    ContainerInfo info;
+    LOG.info("Close container Event triggered for container : {}", 
containerID);
     try {
-      containerWithPipeline =
-          containerManager.getContainerWithPipeline(containerID);
-      info = containerWithPipeline.getContainerInfo();
-      if (info == null) {
-        LOG.error("Failed to update the container state. Container with id : 
{}"
-            + " does not exist", containerID.getId());
-        return;
+      // If the container is in OPEN state, FINALIZE it.
+      if (containerManager.getContainer(containerID).getState()
+          == LifeCycleState.OPEN) {
+        containerManager.updateContainerState(
+            containerID, LifeCycleEvent.FINALIZE);
       }
-    } catch (IOException e) {
-      LOG.error("Failed to update the container state. Container with id : {} "
-          + "does not exist", containerID.getId(), e);
-      return;
-    }
 
-    HddsProtos.LifeCycleState state = info.getState();
-    try {
-      switch (state) {
-      case OPEN:
-        containerManager.updateContainerState(containerID,
-            HddsProtos.LifeCycleEvent.FINALIZE);
-        fireCloseContainerEvents(containerWithPipeline, info, publisher);
-        break;
-      case CLOSING:
-        fireCloseContainerEvents(containerWithPipeline, info, publisher);
-        break;
-      case CLOSED:
-      case DELETING:
-      case DELETED:
-        LOG.info("Cannot close container #{}, it is already in {} state.",
-            containerID.getId(), state);
-        break;
-      default:
-        throw new IOException("Invalid container state for container #"
-            + containerID);
+      // ContainerInfo has to read again after the above state change.
+      final ContainerInfo container = containerManager
+          .getContainer(containerID);
+      // Send close command to datanodes, if the container is in CLOSING state
+      if (container.getState() == LifeCycleState.CLOSING) {
+
+        final CloseContainerCommand closeContainerCommand =
+            new CloseContainerCommand(containerID.getId(),
+                container.getReplicationType(), container.getPipelineID());
+
+        getNodes(container).forEach(node -> publisher.fireEvent(
+            DATANODE_COMMAND,
+            new CommandForDatanode<>(node.getUuid(), closeContainerCommand)));
+      } else {
+        LOG.warn("Cannot close container {}, which is in {} state.",
+            containerID, container.getState());
       }
+
     } catch (IOException ex) {
-      LOG.error("Failed to update the container state for container #{}"
-          + containerID, ex);
+      LOG.error("Failed to close the container {}.", containerID, ex);
     }
   }
 
-  private void fireCloseContainerEvents(
-      ContainerWithPipeline containerWithPipeline, ContainerInfo info,
-      EventPublisher publisher) {
-    ContainerID containerID = info.containerID();
-    // fire events.
-    CloseContainerCommand closeContainerCommand =
-        new CloseContainerCommand(containerID.getId(),
-            info.getReplicationType(), info.getPipelineID());
-
-    Pipeline pipeline = containerWithPipeline.getPipeline();
-    pipeline.getNodes().stream()
-        .map(node ->
-            new CommandForDatanode<>(node.getUuid(), closeContainerCommand))
-        .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command));
-
-    LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
-        pipeline, containerID);
+  /**
+   * Returns the list of Datanodes where this container lives.
+   *
+   * @param container ContainerInfo
+   * @return list of DatanodeDetails
+   * @throws ContainerNotFoundException
+   */
+  private List<DatanodeDetails> getNodes(final ContainerInfo container)
+      throws ContainerNotFoundException {
+    try {
+      return pipelineManager.getPipeline(container.getPipelineID()).getNodes();
+    } catch (PipelineNotFoundException ex) {
+      // Use container replica if the pipeline is not available.
+      return containerManager.getContainerReplicas(container.containerID())
+          .stream()
+          .map(ContainerReplica::getDatanodeDetails)
+          .collect(Collectors.toList());
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 0906ca8..2afd645 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -63,16 +61,6 @@ public interface ContainerManager extends Closeable {
       throws ContainerNotFoundException;
 
   /**
-   * Returns the ContainerInfo from the container ID.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerWithPipeline getContainerWithPipeline(ContainerID containerID)
-      throws ContainerNotFoundException, PipelineNotFoundException;
-
-  /**
    * Returns containers under certain conditions.
    * Search container IDs from start ID(exclusive),
    * The max size of the searching range cannot exceed the
@@ -94,10 +82,10 @@ public interface ContainerManager extends Closeable {
    *
    * @param replicationFactor - replication factor of the container.
    * @param owner
-   * @return - ContainerWithPipeline.
+   * @return - ContainerInfo.
    * @throws IOException
    */
-  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
+  ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor replicationFactor, String owner)
       throws IOException;
 
@@ -158,10 +146,10 @@ public interface ContainerManager extends Closeable {
       throws IOException;
 
   /**
-   * Returns the ContainerWithPipeline.
+   * Returns the ContainerInfo.
    * @return NodeManager
    */
-  ContainerWithPipeline getMatchingContainerWithPipeline(long size,
+  ContainerInfo getMatchingContainer(long size,
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 1d71d4e..f4bb082 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -227,7 +227,9 @@ public class ContainerStateManager {
       final List<Pipeline> pipelines = pipelineManager
           .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
       if (pipelines.isEmpty()) {
-        throw new IOException("Could not allocate container");
+        throw new IOException("Could not allocate container. Cannot get any" +
+            " matching pipeline for Type:" + type +
+            ", Factor:" + replicationFactor + ", State:PipelineState.OPEN");
       }
       pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 9d0ce7a..6c7031d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -21,15 +21,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -185,42 +181,6 @@ public class SCMContainerManager implements 
ContainerManager {
   }
 
   /**
-   * Returns the ContainerInfo and pipeline from the containerID. If container
-   * has no available replicas in datanodes it returns pipeline with no
-   * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
-   * an empty pipeline.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  @Override
-  public ContainerWithPipeline getContainerWithPipeline(ContainerID 
containerID)
-      throws ContainerNotFoundException, PipelineNotFoundException {
-    lock.lock();
-    try {
-      final ContainerInfo contInfo = getContainer(containerID);
-      Pipeline pipeline;
-      if (contInfo.isOpen()) {
-        // If pipeline with given pipeline Id already exist return it
-        pipeline = pipelineManager.getPipeline(contInfo.getPipelineID());
-      } else {
-        // For close containers create pipeline from datanodes with replicas
-        Set<ContainerReplica> dnWithReplicas = containerStateManager
-            .getContainerReplicas(contInfo.containerID());
-        List<DatanodeDetails> dns =
-            dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails)
-                .collect(Collectors.toList());
-        pipeline = pipelineManager.createPipeline(ReplicationType.STAND_ALONE,
-            contInfo.getReplicationFactor(), dns);
-      }
-      return new ContainerWithPipeline(contInfo, pipeline);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -261,16 +221,13 @@ public class SCMContainerManager implements 
ContainerManager {
    * @throws IOException - Exception
    */
   @Override
-  public ContainerWithPipeline allocateContainer(final ReplicationType type,
+  public ContainerInfo allocateContainer(final ReplicationType type,
       final ReplicationFactor replicationFactor, final String owner)
       throws IOException {
     lock.lock();
     try {
       final ContainerInfo containerInfo; containerInfo = containerStateManager
           .allocateContainer(pipelineManager, type, replicationFactor, owner);
-      final Pipeline pipeline = pipelineManager.getPipeline(
-          containerInfo.getPipelineID());
-
       try {
         final byte[] containerIDBytes = Longs.toByteArray(
             containerInfo.getContainerID());
@@ -286,7 +243,7 @@ public class SCMContainerManager implements 
ContainerManager {
         }
         throw ex;
       }
-      return new ContainerWithPipeline(containerInfo, pipeline);
+      return containerInfo;
     } finally {
       lock.unlock();
     }
@@ -366,12 +323,8 @@ public class SCMContainerManager implements 
ContainerManager {
       break;
     case CLOSE:
       break;
-    case UPDATE:
-      break;
     case DELETE:
       break;
-    case TIMEOUT:
-      break;
     case CLEANUP:
       break;
     default:
@@ -434,17 +387,11 @@ public class SCMContainerManager implements 
ContainerManager {
    * @param state - State of the Container-- {Open, Allocated etc.}
    * @return ContainerInfo, null if there is no match found.
    */
-  public ContainerWithPipeline getMatchingContainerWithPipeline(
+  public ContainerInfo getMatchingContainer(
       final long sizeRequired, String owner, ReplicationType type,
       ReplicationFactor factor, LifeCycleState state) throws IOException {
-    ContainerInfo containerInfo = containerStateManager
-        .getMatchingContainer(sizeRequired, owner, type, factor, state);
-    if (containerInfo == null) {
-      return null;
-    }
-    Pipeline pipeline = pipelineManager
-        .getPipeline(containerInfo.getPipelineID());
-    return new ContainerWithPipeline(containerInfo, pipeline);
+    return containerStateManager.getMatchingContainer(
+        sizeRequired, owner, type, factor, state);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 0c9b865..f141ae5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -61,6 +62,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos
@@ -161,11 +163,13 @@ public class SCMClientProtocolServer implements
       replicationType, HddsProtos.ReplicationFactor factor,
       String owner) throws IOException {
     ScmUtils.preCheck(ScmOps.allocateContainer, chillModePrecheck);
-    String remoteUser = getRpcRemoteUsername();
-    getScm().checkAdminAccess(remoteUser);
+    getScm().checkAdminAccess(getRpcRemoteUsername());
 
-    return scm.getContainerManager()
+    final ContainerInfo container = scm.getContainerManager()
         .allocateContainer(replicationType, factor, owner);
+    final Pipeline pipeline = scm.getPipelineManager()
+        .getPipeline(container.getPipelineID());
+    return new ContainerWithPipeline(container, pipeline);
   }
 
   @Override
@@ -191,8 +195,26 @@ public class SCMClientProtocolServer implements
       }
     }
     getScm().checkAdminAccess(null);
-    return scm.getContainerManager()
-        .getContainerWithPipeline(ContainerID.valueof(containerID));
+
+    final ContainerID id = ContainerID.valueof(containerID);
+    final ContainerInfo container = scm.getContainerManager().getContainer(id);
+    final Pipeline pipeline;
+
+    if (container.isOpen()) {
+      // Ratis pipeline
+      pipeline = scm.getPipelineManager()
+          .getPipeline(container.getPipelineID());
+    } else {
+      pipeline = scm.getPipelineManager().createPipeline(
+          HddsProtos.ReplicationType.STAND_ALONE,
+          container.getReplicationFactor(),
+          scm.getContainerManager()
+              .getContainerReplicas(id).stream()
+              .map(ContainerReplica::getDatanodeDetails)
+              .collect(Collectors.toList()));
+    }
+
+    return new ContainerWithPipeline(container, pipeline);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 5f419d3..08ee382 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -207,13 +207,13 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     containerManager = new SCMContainerManager(
         conf, scmNodeManager, pipelineManager, eventQueue);
-    scmBlockManager = new BlockManagerImpl(
-        conf, scmNodeManager, containerManager, eventQueue);
+    scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
+        pipelineManager, containerManager, eventQueue);
 
     replicationStatus = new ReplicationActivityStatus();
 
     CloseContainerEventHandler closeContainerHandler =
-        new CloseContainerEventHandler(containerManager);
+        new CloseContainerEventHandler(pipelineManager, containerManager);
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
     PipelineReportHandler pipelineReportHandler =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 66ae682..8fbe7fb 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -413,7 +413,7 @@ public final class TestUtils {
       throws IOException {
     return containerManager
         .allocateContainer(HddsProtos.ReplicationType.STAND_ALONE,
-            HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo();
+            HddsProtos.ReplicationFactor.THREE, "root");
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index aa940df..ce7fa2f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -91,7 +91,7 @@ public class TestBlockManager implements 
EventHandler<Boolean> {
     mapping = new SCMContainerManager(conf, nodeManager, pipelineManager,
         eventQueue);
     blockManager = new BlockManagerImpl(conf,
-        nodeManager, mapping, eventQueue);
+        nodeManager, pipelineManager, mapping, eventQueue);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
     eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
     if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 268cf8b..48949be 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hdds.scm.block;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -51,11 +54,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -99,21 +104,23 @@ public class TestDeletedBlockLog {
         DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString())
             .build());
 
-    ContainerInfo containerInfo =
-        new ContainerInfo.Builder().setContainerID(1).build();
-    Pipeline pipeline = Pipeline.newBuilder()
-        .setType(ReplicationType.RATIS)
-        .setFactor(ReplicationFactor.THREE)
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .setNodes(dnList)
-        .build();
-    ContainerWithPipeline containerWithPipeline =
-        new ContainerWithPipeline(containerInfo, pipeline);
-    when(containerManager.getContainerWithPipeline(anyObject()))
-        .thenReturn(containerWithPipeline);
+    final ContainerInfo container =
+        new ContainerInfo.Builder().setContainerID(1)
+            .setReplicationFactor(ReplicationFactor.THREE)
+            .setState(HddsProtos.LifeCycleState.CLOSED)
+            .build();
+    final Set<ContainerReplica> replicaSet = dnList.stream()
+        .map(datanodeDetails -> ContainerReplica.newBuilder()
+            .setContainerID(container.containerID())
+            .setContainerState(ContainerReplicaProto.State.OPEN)
+            .setDatanodeDetails(datanodeDetails)
+            .build())
+        .collect(Collectors.toSet());
+
+    when(containerManager.getContainerReplicas(anyObject()))
+        .thenReturn(replicaSet);
     when(containerManager.getContainer(anyObject()))
-        .thenReturn(containerInfo);
+        .thenReturn(container);
   }
 
   @After
@@ -383,8 +390,7 @@ public class TestDeletedBlockLog {
 
   private void mockContainerInfo(long containerID, DatanodeDetails dd)
       throws IOException {
-    List<DatanodeDetails> dns = new ArrayList<>();
-    dns.add(dd);
+    List<DatanodeDetails> dns = Collections.singletonList(dd);
     Pipeline pipeline = Pipeline.newBuilder()
             .setType(ReplicationType.STAND_ALONE)
             .setFactor(ReplicationFactor.ONE)
@@ -399,11 +405,18 @@ public class TestDeletedBlockLog {
         .setReplicationFactor(pipeline.getFactor());
 
     ContainerInfo containerInfo = builder.build();
-    ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
-        containerInfo, pipeline);
     Mockito.doReturn(containerInfo).when(containerManager)
         .getContainer(ContainerID.valueof(containerID));
-    Mockito.doReturn(containerWithPipeline).when(containerManager)
-        .getContainerWithPipeline(ContainerID.valueof(containerID));
+
+    final Set<ContainerReplica> replicaSet = dns.stream()
+        .map(datanodeDetails -> ContainerReplica.newBuilder()
+            .setContainerID(containerInfo.containerID())
+            .setContainerState(ContainerReplicaProto.State.OPEN)
+            .setDatanodeDetails(datanodeDetails)
+            .build())
+        .collect(Collectors.toSet());
+    when(containerManager.getContainerReplicas(
+        ContainerID.valueof(containerID)))
+        .thenReturn(replicaSet);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index fd2dadf..fec3f84 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -51,6 +49,7 @@ public class TestCloseContainerEventHandler {
 
   private static Configuration configuration;
   private static MockNodeManager nodeManager;
+  private static PipelineManager pipelineManager;
   private static SCMContainerManager containerManager;
   private static long size;
   private static File testDir;
@@ -66,14 +65,14 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
-    PipelineManager pipelineManager =
+    pipelineManager =
         new SCMPipelineManager(configuration, nodeManager, eventQueue);
     containerManager = new
         SCMContainerManager(configuration, nodeManager,
         pipelineManager, new EventQueue());
     eventQueue = new EventQueue();
     eventQueue.addHandler(CLOSE_CONTAINER,
-        new CloseContainerEventHandler(containerManager));
+        new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
   }
 
@@ -105,19 +104,18 @@ public class TestCloseContainerEventHandler {
         new ContainerID(id));
     eventQueue.processAll(1000);
     Assert.assertTrue(logCapturer.getOutput()
-        .contains("Failed to update the container state"));
+        .contains("Failed to close the container"));
   }
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
 
-    ContainerWithPipeline containerWithPipeline = containerManager
+    ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, "ozone");
-    ContainerID id = new ContainerID(
-        containerWithPipeline.getContainerInfo().getContainerID());
-    DatanodeDetails datanode =
-        containerWithPipeline.getPipeline().getFirstNode();
+    ContainerID id = container.containerID();
+    DatanodeDetails datanode = pipelineManager
+        .getPipeline(container.getPipelineID()).getFirstNode();
     int closeCount = nodeManager.getCommandCount(datanode);
     eventQueue.fireEvent(CLOSE_CONTAINER, id);
     eventQueue.processAll(1000);
@@ -132,23 +130,22 @@ public class TestCloseContainerEventHandler {
 
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
-    ContainerWithPipeline containerWithPipeline = containerManager
+    ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, "ozone");
-    ContainerID id = new ContainerID(
-        containerWithPipeline.getContainerInfo().getContainerID());
+    ContainerID id = container.containerID();
     int[] closeCount = new int[3];
     eventQueue.fireEvent(CLOSE_CONTAINER, id);
     eventQueue.processAll(1000);
     int i = 0;
-    for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getNodes()) {
+    for (DatanodeDetails details : pipelineManager
+        .getPipeline(container.getPipelineID()).getNodes()) {
       closeCount[i] = nodeManager.getCommandCount(details);
       i++;
     }
     i = 0;
-    for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getNodes()) {
+    for (DatanodeDetails details : pipelineManager
+        .getPipeline(container.getPipelineID()).getNodes()) {
       Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
       i++;
     }
@@ -156,8 +153,8 @@ public class TestCloseContainerEventHandler {
     eventQueue.processAll(1000);
     i = 0;
     // Make sure close is queued for each datanode on the pipeline
-    for (DatanodeDetails details : containerWithPipeline.getPipeline()
-        .getNodes()) {
+    for (DatanodeDetails details : pipelineManager
+        .getPipeline(container.getPipelineID()).getNodes()) {
       Assert.assertEquals(closeCount[i] + 1,
           nodeManager.getCommandCount(details));
       Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index a2eba92..14f516d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -100,14 +100,14 @@ public class TestContainerReportHandler implements 
EventPublisher {
 
     ContainerInfo cont1 = containerManager
         .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root").getContainerInfo();
+            ReplicationFactor.THREE, "root");
     ContainerInfo cont2 = containerManager
         .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root").getContainerInfo();
+            ReplicationFactor.THREE, "root");
     // Open Container
     ContainerInfo cont3 = containerManager
         .allocateContainer(ReplicationType.STAND_ALONE,
-            ReplicationFactor.THREE, "root").getContainerInfo();
+            ReplicationFactor.THREE, "root");
 
     long c1 = cont1.getContainerID();
     long c2 = cont2.getContainerID();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 6e0d85b..f37b447 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -23,7 +23,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -44,11 +43,13 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Tests for Container ContainerManager.
@@ -109,7 +110,7 @@ public class TestSCMContainerManager {
 
   @Test
   public void testallocateContainer() throws Exception {
-    ContainerWithPipeline containerInfo = containerManager.allocateContainer(
+    ContainerInfo containerInfo = containerManager.allocateContainer(
         xceiverClientManager.getType(),
         xceiverClientManager.getFactor(),
         containerOwner);
@@ -126,14 +127,15 @@ public class TestSCMContainerManager {
      */
     Set<UUID> pipelineList = new TreeSet<>();
     for (int x = 0; x < 30; x++) {
-      ContainerWithPipeline containerInfo = containerManager.allocateContainer(
+      ContainerInfo containerInfo = containerManager.allocateContainer(
           xceiverClientManager.getType(),
           xceiverClientManager.getFactor(),
           containerOwner);
 
       Assert.assertNotNull(containerInfo);
-      Assert.assertNotNull(containerInfo.getPipeline());
-      pipelineList.add(containerInfo.getPipeline().getFirstNode()
+      Assert.assertNotNull(containerInfo.getPipelineID());
+      pipelineList.add(pipelineManager.getPipeline(
+          containerInfo.getPipelineID()).getFirstNode()
           .getUuid());
     }
     Assert.assertTrue(pipelineList.size() > 5);
@@ -141,32 +143,27 @@ public class TestSCMContainerManager {
 
   @Test
   public void testGetContainer() throws IOException {
-    ContainerWithPipeline containerInfo = containerManager.allocateContainer(
+    ContainerInfo containerInfo = containerManager.allocateContainer(
         xceiverClientManager.getType(),
         xceiverClientManager.getFactor(),
         containerOwner);
-    Pipeline pipeline  = containerInfo.getPipeline();
+    Assert.assertNotNull(containerInfo);
+    Pipeline pipeline  = pipelineManager
+        .getPipeline(containerInfo.getPipelineID());
     Assert.assertNotNull(pipeline);
-    Pipeline newPipeline = containerInfo.getPipeline();
-    Assert.assertEquals(pipeline.getFirstNode().getUuid(),
-        newPipeline.getFirstNode().getUuid());
+    Assert.assertEquals(containerInfo,
+        containerManager.getContainer(containerInfo.containerID()));
   }
 
   @Test
   public void testGetContainerWithPipeline() throws Exception {
-    ContainerWithPipeline containerWithPipeline = containerManager
+    ContainerInfo contInfo = containerManager
         .allocateContainer(xceiverClientManager.getType(),
             xceiverClientManager.getFactor(), containerOwner);
-    ContainerInfo contInfo = containerWithPipeline.getContainerInfo();
     // Add dummy replicas for container.
-    DatanodeDetails dn1 = DatanodeDetails.newBuilder()
-        .setHostName("host1")
-        .setIpAddress("1.1.1.1")
-        .setUuid(UUID.randomUUID().toString()).build();
-    DatanodeDetails dn2 = DatanodeDetails.newBuilder()
-        .setHostName("host2")
-        .setIpAddress("2.2.2.2")
-        .setUuid(UUID.randomUUID().toString()).build();
+    Iterator<DatanodeDetails> nodes = pipelineManager
+        .getPipeline(contInfo.getPipelineID()).getNodes().iterator();
+    DatanodeDetails dn1 = nodes.next();
     containerManager.updateContainerState(contInfo.containerID(),
         LifeCycleEvent.FINALIZE);
     containerManager
@@ -180,27 +177,21 @@ public class TestSCMContainerManager {
         ContainerReplica.newBuilder().setContainerID(contInfo.containerID())
             .setContainerState(ContainerReplicaProto.State.CLOSED)
             .setDatanodeDetails(dn1).build());
-    containerManager.updateContainerReplica(contInfo.containerID(),
-        ContainerReplica.newBuilder().setContainerID(contInfo.containerID())
-            .setContainerState(ContainerReplicaProto.State.CLOSED)
-            .setDatanodeDetails(dn2).build());
 
-    Assert.assertEquals(2,
+    Assert.assertEquals(1,
         containerManager.getContainerReplicas(
             finalContInfo.containerID()).size());
 
     contInfo = containerManager.getContainer(contInfo.containerID());
     Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED);
-    Pipeline pipeline = containerWithPipeline.getPipeline();
-    pipelineManager.finalizePipeline(pipeline.getId());
-
-    ContainerWithPipeline containerWithPipeline2 = containerManager
-        .getContainerWithPipeline(contInfo.containerID());
-    pipeline = containerWithPipeline2.getPipeline();
-    Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2);
-    Assert.assertNotNull("Pipeline should not be null", pipeline);
-    Assert.assertTrue(pipeline.getNodes().contains(dn1));
-    Assert.assertTrue(pipeline.getNodes().contains(dn2));
+    // After closing the container, we should get the replica and construct
+    // standalone pipeline. No more ratis pipeline.
+
+    Set<DatanodeDetails> replicaNodes = containerManager
+        .getContainerReplicas(contInfo.containerID())
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toSet());
+    Assert.assertTrue(replicaNodes.contains(dn1));
   }
 
   @Test
@@ -232,11 +223,9 @@ public class TestSCMContainerManager {
   private ContainerInfo createContainer()
       throws IOException {
     nodeManager.setChillmode(false);
-    ContainerWithPipeline containerWithPipeline = containerManager
+    return containerManager
         .allocateContainer(xceiverClientManager.getType(),
             xceiverClientManager.getFactor(), containerOwner);
-    ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
-    return containerInfo;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 7a9dbad..ad24a10 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -155,12 +155,13 @@ public class TestContainerPlacement {
       assertEquals(remaining * nodeCount,
           (long) nodeManager.getStats().getRemaining().get());
 
-      ContainerWithPipeline containerWithPipeline = containerManager
+      ContainerInfo container = containerManager
           .allocateContainer(
           xceiverClientManager.getType(),
           xceiverClientManager.getFactor(), "OZONE");
       assertEquals(xceiverClientManager.getFactor().getNumber(),
-          containerWithPipeline.getPipeline().getNodes().size());
+          containerManager.getContainerReplicas(
+              container.containerID()).size());
     } finally {
       IOUtils.closeQuietly(containerManager);
       IOUtils.closeQuietly(nodeManager);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index fd2c973..8d23afd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
@@ -61,8 +62,11 @@ public class TestNode2PipelineMap {
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
-    ratisContainer = containerManager.allocateContainer(
+    pipelineManager = scm.getPipelineManager();
+    ContainerInfo containerInfo = containerManager.allocateContainer(
         RATIS, THREE, "testOwner");
+    ratisContainer = new ContainerWithPipeline(containerInfo,
+        pipelineManager.getPipeline(containerInfo.getPipelineID()));
     pipelineManager = scm.getPipelineManager();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
index 618cd8e..3207878 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -22,14 +22,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -48,8 +47,8 @@ public class TestNodeFailure {
 
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf;
-  private static ContainerWithPipeline ratisContainer1;
-  private static ContainerWithPipeline ratisContainer2;
+  private static Pipeline ratisPipelineOne;
+  private static Pipeline ratisPipelineTwo;
   private static ContainerManager containerManager;
   private static PipelineManager pipelineManager;
   private static long timeForFailure;
@@ -76,10 +75,12 @@ public class TestNodeFailure {
     StorageContainerManager scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
     pipelineManager = scm.getPipelineManager();
-    ratisContainer1 = containerManager.allocateContainer(
-        RATIS, THREE, "testOwner");
-    ratisContainer2 = containerManager.allocateContainer(
-        RATIS, THREE, "testOwner");
+    ratisPipelineOne = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+        RATIS, THREE, "testOwner").getPipelineID());
+    ratisPipelineTwo = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+        RATIS, THREE, "testOwner").getPipelineID());
     // At this stage, there should be 2 pipeline one with 1 open container 
each.
     // Try closing the both the pipelines, one with a closed container and
     // the other with an open container.
@@ -99,12 +100,15 @@ public class TestNodeFailure {
     }
   }
 
+  @Ignore
+  // Enable this after we implement teardown pipeline logic once a datanode
+  // dies.
   @Test(timeout = 300_000L)
   public void testPipelineFail() throws InterruptedException, IOException,
       TimeoutException {
-    Assert.assertEquals(ratisContainer1.getPipeline().getPipelineState(),
+    Assert.assertEquals(ratisPipelineOne.getPipelineState(),
         Pipeline.PipelineState.OPEN);
-    Pipeline pipelineToFail = ratisContainer1.getPipeline();
+    Pipeline pipelineToFail = ratisPipelineOne;
     DatanodeDetails dnToFail = pipelineToFail.getFirstNode();
     cluster.shutdownHddsDatanode(dnToFail);
 
@@ -112,18 +116,19 @@ public class TestNodeFailure {
     Thread.sleep(3 * timeForFailure);
 
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipelineManager.getPipeline(ratisContainer1.getPipeline().getId())
+        pipelineManager.getPipeline(ratisPipelineOne.getId())
             .getPipelineState());
     Assert.assertEquals(Pipeline.PipelineState.OPEN,
-        pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
+        pipelineManager.getPipeline(ratisPipelineTwo.getId())
             .getPipelineState());
     // Now restart the datanode and make sure that a new pipeline is created.
     cluster.setWaitForClusterToBeReadyTimeout(300000);
     cluster.restartHddsDatanode(dnToFail, true);
-    ContainerWithPipeline ratisContainer3 =
-        containerManager.allocateContainer(RATIS, THREE, "testOwner");
+    Pipeline ratisPipelineThree = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+            RATIS, THREE, "testOwner").getPipelineID());
     //Assert that new container is not created from the ratis 2 pipeline
-    Assert.assertNotEquals(ratisContainer3.getPipeline().getId(),
-        ratisContainer2.getPipeline().getId());
+    Assert.assertNotEquals(ratisPipelineThree.getId(),
+        ratisPipelineTwo.getId());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 211782b..66bdb5b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
@@ -62,10 +63,15 @@ public class TestPipelineClose {
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
-    ratisContainer1 = containerManager
+    pipelineManager = scm.getPipelineManager();
+    ContainerInfo containerInfo1 = containerManager
         .allocateContainer(RATIS, THREE, "testOwner");
-    ratisContainer2 = containerManager
+    ratisContainer1 = new ContainerWithPipeline(containerInfo1,
+        pipelineManager.getPipeline(containerInfo1.getPipelineID()));
+    ContainerInfo containerInfo2 = containerManager
         .allocateContainer(RATIS, THREE, "testOwner");
+    ratisContainer2 = new ContainerWithPipeline(containerInfo2,
+        pipelineManager.getPipeline(containerInfo2.getPipelineID()));
     pipelineManager = scm.getPipelineManager();
     // At this stage, there should be 2 pipeline one with 1 open container 
each.
     // Try closing the both the pipelines, one with a closed container and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 0fa8649..ab70ed1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -65,10 +66,12 @@ public class TestSCMRestart {
     StorageContainerManager scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
     pipelineManager = scm.getPipelineManager();
-    ratisPipeline1 = containerManager.allocateContainer(
-        RATIS, THREE, "Owner1").getPipeline();
-    ratisPipeline2 = containerManager.allocateContainer(
-        RATIS, ONE, "Owner2").getPipeline();
+    ratisPipeline1 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+        RATIS, THREE, "Owner1").getPipelineID());
+    ratisPipeline2 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+        RATIS, ONE, "Owner2").getPipelineID());
     // At this stage, there should be 2 pipeline one with 1 open container
     // each. Try restarting the SCM and then discover that pipeline are in
     // correct state.
@@ -100,10 +103,10 @@ public class TestSCMRestart {
     Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
     Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
 
-    // Try creating a new ratis pipeline, it should be from the same pipeline
+    // Try creating a new container, it should be from the same pipeline
     // as was before restart
-    Pipeline newRatisPipeline = newContainerManager
-        .allocateContainer(RATIS, THREE, "Owner1").getPipeline();
-    Assert.assertEquals(newRatisPipeline.getId(), ratisPipeline1.getId());
+    ContainerInfo containerInfo = newContainerManager
+        .allocateContainer(RATIS, THREE, "Owner1");
+    Assert.assertEquals(containerInfo.getPipelineID(), ratisPipeline1.getId());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
index d7e5360..9982da4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
@@ -52,9 +52,8 @@ public class OzoneTestUtils {
             
.updateContainerState(ContainerID.valueof(blockID.getContainerID()),
                 HddsProtos.LifeCycleEvent.CLOSE);
         Assert.assertFalse(scm.getContainerManager()
-            .getContainerWithPipeline(ContainerID.valueof(
-                blockID.getContainerID()))
-            .getContainerInfo().isOpen());
+            .getContainer(ContainerID.valueof(
+                blockID.getContainerID())).isOpen());
       } catch (IOException e) {
         e.printStackTrace();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
index fe060a6..db3a197 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
@@ -446,9 +446,10 @@ public class TestOzoneRestClient {
     // Sum the data size from chunks in Container via containerID
     // and localID, make sure the size equals to the actually value size.
     Pipeline pipeline = cluster.getStorageContainerManager()
-        .getContainerManager().getContainerWithPipeline(
-            ContainerID.valueof(containerID))
-        .getPipeline();
+        .getPipelineManager().getPipeline(
+            cluster.getStorageContainerManager()
+                .getContainerManager().getContainer(
+                    ContainerID.valueof(containerID)).getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(datanodes.size(), 1);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index df2fd1f..576801d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
@@ -377,10 +378,12 @@ public class TestCloseContainerHandlingByClient {
       cluster.getStorageContainerManager().getEventQueue()
           .fireEvent(SCMEvents.CLOSE_CONTAINER,
               ContainerID.valueof(containerID));
+      ContainerInfo container = cluster.getStorageContainerManager()
+          .getContainerManager()
+          .getContainer(ContainerID.valueof(containerID));
       Pipeline pipeline =
-          cluster.getStorageContainerManager().getContainerManager()
-              .getContainerWithPipeline(ContainerID.valueof(containerID))
-              .getPipeline();
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
       pipelineList.add(pipeline);
       List<DatanodeDetails> datanodes = pipeline.getNodes();
       for (DatanodeDetails details : datanodes) {
@@ -435,10 +438,13 @@ public class TestCloseContainerHandlingByClient {
     List<OmKeyLocationInfo> locationInfos =
         new ArrayList<>(groupOutputStream.getLocationInfoList());
     long containerID = locationInfos.get(0).getContainerID();
-    List<DatanodeDetails> datanodes =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainerWithPipeline(ContainerID.valueof(containerID))
-            .getPipeline().getNodes();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerID));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
     waitForContainerClose(keyName, key, 
HddsProtos.ReplicationType.STAND_ALONE);
     dataString = fixedLengthString(keyString, (1 * blockSize));
@@ -538,10 +544,13 @@ public class TestCloseContainerHandlingByClient {
     List<OmKeyLocationInfo> locationInfos =
         groupOutputStream.getLocationInfoList();
     long containerID = locationInfos.get(0).getContainerID();
-    List<DatanodeDetails> datanodes =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainerWithPipeline(ContainerID.valueof(containerID))
-            .getPipeline().getNodes();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerID));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
     // move the container on the datanode to Closing state, this will ensure
     // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 0b51bb3..555b895 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.*;
@@ -645,10 +644,10 @@ public class TestOzoneRpcClient {
     Assert
         .assertEquals(value.getBytes().length, 
keyLocations.get(0).getLength());
 
-    ContainerWithPipeline container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainerWithPipeline(new ContainerID(containerID));
-    Pipeline pipeline = container.getPipeline();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager().getContainer(ContainerID.valueof(containerID));
+    Pipeline pipeline = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
 
     DatanodeDetails datanodeDetails = datanodes.get(0);
@@ -662,17 +661,17 @@ public class TestOzoneRpcClient {
     // shutdown the datanode
     cluster.shutdownHddsDatanode(datanodeDetails);
 
-    Assert.assertTrue(container.getContainerInfo().getState()
+    Assert.assertTrue(container.getState()
         == HddsProtos.LifeCycleState.OPEN);
     // try to read, this shouls be successful
     readKey(bucket, keyName, value);
 
-    Assert.assertTrue(container.getContainerInfo().getState()
+    Assert.assertTrue(container.getState()
         == HddsProtos.LifeCycleState.OPEN);
     // shutdown the second datanode
     datanodeDetails = datanodes.get(1);
     cluster.shutdownHddsDatanode(datanodeDetails);
-    Assert.assertTrue(container.getContainerInfo().getState()
+    Assert.assertTrue(container.getState()
         == HddsProtos.LifeCycleState.OPEN);
 
     // the container is open and with loss of 2 nodes we still should be able
@@ -750,10 +749,10 @@ public class TestOzoneRpcClient {
 
     // Second, sum the data size from chunks in Container via containerID
     // and localID, make sure the size equals to the size from keyDetails.
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager().getContainer(ContainerID.valueof(containerID));
     Pipeline pipeline = cluster.getStorageContainerManager()
-        .getContainerManager().getContainerWithPipeline(
-            ContainerID.valueof(containerID))
-        .getPipeline();
+        .getPipelineManager().getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(datanodes.size(), 1);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to