HDDS-567. Rename Mapping to ContainerManager in SCM. Contributed by Nanda kumar.

(cherry picked from commit 095c269620e01ce46832ea25e696c0ab71613ea3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5d328662
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5d328662
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5d328662

Branch: refs/heads/ozone-0.2
Commit: 5d328662a04da2e21c7765936eed8db00da85a91
Parents: f4009ae
Author: Nanda kumar <na...@apache.org>
Authored: Wed Oct 3 16:00:39 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Oct 3 16:01:16 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/block/BlockManagerImpl.java |   6 +-
 .../block/DatanodeDeletedBlockTransactions.java |  10 +-
 .../hdds/scm/block/DeletedBlockLogImpl.java     |   8 +-
 .../hdds/scm/block/SCMBlockDeletingService.java |  15 +-
 .../container/CloseContainerEventHandler.java   |   4 +-
 .../scm/container/CloseContainerWatcher.java    |   4 +-
 .../hdds/scm/container/ContainerManager.java    | 142 ++++
 .../hdds/scm/container/ContainerMapping.java    | 699 -------------------
 .../scm/container/ContainerReportHandler.java   |  12 +-
 .../scm/container/ContainerStateManager.java    |   8 +-
 .../hadoop/hdds/scm/container/Mapping.java      | 141 ----
 .../hdds/scm/container/SCMContainerManager.java | 699 +++++++++++++++++++
 .../scm/server/SCMClientProtocolServer.java     |  22 +-
 .../scm/server/SCMDatanodeProtocolServer.java   |   2 +-
 .../scm/server/StorageContainerManager.java     |  36 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java |   6 +-
 .../hdds/scm/block/TestDeletedBlockLog.java     |   8 +-
 .../TestCloseContainerEventHandler.java         |   4 +-
 .../scm/container/TestContainerMapping.java     | 380 ----------
 .../container/TestContainerReportHandler.java   |  10 +-
 .../container/TestContainerStateManager.java    |   2 +-
 .../scm/container/TestSCMContainerManager.java  | 378 ++++++++++
 .../hdds/scm/node/TestContainerPlacement.java   |   9 +-
 .../hdds/scm/node/TestDeadNodeHandler.java      |   4 +-
 .../container/TestCloseContainerWatcher.java    |  12 +-
 .../TestContainerStateManagerIntegration.java   |  52 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  21 +-
 .../hdds/scm/pipeline/TestNodeFailure.java      |  18 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |  28 +-
 .../hdds/scm/pipeline/TestSCMRestart.java       |  33 +-
 .../org/apache/hadoop/ozone/OzoneTestUtils.java |   6 +-
 .../ozone/client/rest/TestOzoneRestClient.java  |   2 +-
 .../rpc/TestCloseContainerHandlingByClient.java |   6 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    |   2 +-
 .../commandhandler/TestBlockDeletion.java       |   3 +-
 .../TestCloseContainerByPipeline.java           |   6 +-
 .../TestCloseContainerHandler.java              |   2 +-
 .../hadoop/ozone/om/TestScmChillMode.java       |  12 +-
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |  12 +-
 39 files changed, 1415 insertions(+), 1409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 4777016..30740c7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmUtils;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -70,7 +70,7 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
   // by itself and does not rely on the Block service offered by SCM.
 
   private final NodeManager nodeManager;
-  private final Mapping containerManager;
+  private final ContainerManager containerManager;
 
   private final long containerSize;
 
@@ -92,7 +92,7 @@ public class BlockManagerImpl implements 
EventHandler<Boolean>,
    * @throws IOException
    */
   public BlockManagerImpl(final Configuration conf,
-      final NodeManager nodeManager, final Mapping containerManager,
+      final NodeManager nodeManager, final ContainerManager containerManager,
       EventPublisher eventPublisher)
       throws IOException {
     this.nodeManager = nodeManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 8702a42..c86f9cd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -17,7 +17,7 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import com.google.common.collect.ArrayListMultimap;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -42,15 +42,15 @@ public class DatanodeDeletedBlockTransactions {
   private int maximumAllowedTXNum;
   // Current counter of inserted TX.
   private int currentTXNum;
-  private Mapping mappingService;
+  private ContainerManager containerManager;
   // A list of TXs mapped to a certain datanode ID.
   private final ArrayListMultimap<UUID, DeletedBlocksTransaction>
       transactions;
 
-  DatanodeDeletedBlockTransactions(Mapping mappingService,
+  DatanodeDeletedBlockTransactions(ContainerManager containerManager,
       int maximumAllowedTXNum, int nodeNum) {
     this.transactions = ArrayListMultimap.create();
-    this.mappingService = mappingService;
+    this.containerManager = containerManager;
     this.maximumAllowedTXNum = maximumAllowedTXNum;
     this.nodeNum = nodeNum;
   }
@@ -60,7 +60,7 @@ public class DatanodeDeletedBlockTransactions {
     Pipeline pipeline = null;
     try {
       ContainerWithPipeline containerWithPipeline =
-          mappingService.getContainerWithPipeline(tx.getContainerID());
+          containerManager.getContainerWithPipeline(tx.getContainerID());
       if (containerWithPipeline.getContainerInfo().isContainerOpen()
           || containerWithPipeline.getPipeline().isEmpty()) {
         return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 68435d1..5d3afd5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.hdds.scm.command
     .CommandStatusReportHandler.DeleteBlockStatus;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -92,15 +92,15 @@ public class DeletedBlockLogImpl
 
   private final int maxRetry;
   private final MetadataStore deletedStore;
-  private final Mapping containerManager;
+  private final ContainerManager containerManager;
   private final Lock lock;
   // The latest id of deleted blocks in the db.
   private long lastTxID;
   // Maps txId to set of DNs which are successful in committing the transaction
   private Map<Long, Set<UUID>> transactionToDNsCommitMap;
 
-  public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
-      throws IOException {
+  public DeletedBlockLogImpl(Configuration conf,
+     ContainerManager containerManager) throws IOException {
     maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
         OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index b85d77f..8e07fa2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.block;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -63,7 +63,7 @@ public class SCMBlockDeletingService extends 
BackgroundService {
   // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
   private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
   private final DeletedBlockLog deletedBlockLog;
-  private final Mapping mappingService;
+  private final ContainerManager containerManager;
   private final NodeManager nodeManager;
   private final EventPublisher eventPublisher;
 
@@ -81,12 +81,13 @@ public class SCMBlockDeletingService extends 
BackgroundService {
   private int blockDeleteLimitSize;
 
   public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
-      Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher,
-      long interval, long serviceTimeout, Configuration conf) {
+      ContainerManager containerManager, NodeManager nodeManager,
+      EventPublisher eventPublisher, long interval, long serviceTimeout,
+      Configuration conf) {
     super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
         BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
     this.deletedBlockLog = deletedBlockLog;
-    this.mappingService = mapper;
+    this.containerManager = containerManager;
     this.nodeManager = nodeManager;
     this.eventPublisher = eventPublisher;
 
@@ -139,7 +140,7 @@ public class SCMBlockDeletingService extends 
BackgroundService {
       List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeState.HEALTHY);
       Map<Long, Long> transactionMap = null;
       if (datanodes != null) {
-        transactions = new DatanodeDeletedBlockTransactions(mappingService,
+        transactions = new DatanodeDeletedBlockTransactions(containerManager,
             blockDeleteLimitSize, datanodes.size());
         try {
           transactionMap = deletedBlockLog.getTransactions(transactions);
@@ -174,7 +175,7 @@ public class SCMBlockDeletingService extends 
BackgroundService {
                     transactions.getTransactionIDList(dnId)));
           }
         }
-        mappingService.updateDeleteTransactionId(transactionMap);
+        containerManager.updateDeleteTransactionId(transactionMap);
       }
 
       if (dnTxCount > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 7baecc4..8be7803 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -46,9 +46,9 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
       LoggerFactory.getLogger(CloseContainerEventHandler.class);
 
 
-  private final Mapping containerManager;
+  private final ContainerManager containerManager;
 
-  public CloseContainerEventHandler(Mapping containerManager) {
+  public CloseContainerEventHandler(ContainerManager containerManager) {
     this.containerManager = containerManager;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
index 8e277b9..7b94bd2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
@@ -44,11 +44,11 @@ public class CloseContainerWatcher extends
 
   public static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerWatcher.class);
-  private final Mapping containerManager;
+  private final ContainerManager containerManager;
 
   public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
       Event<CloseContainerStatus> completionEvent,
-      LeaseManager<Long> leaseManager, Mapping containerManager) {
+      LeaseManager<Long> leaseManager, ContainerManager containerManager) {
     super(startEvent, completionEvent, leaseManager);
     this.containerManager = containerManager;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
new file mode 100644
index 0000000..e586f3e
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public interface ContainerManager extends Closeable {
+  /**
+   * Returns the ContainerInfo from the container ID.
+   *
+   * @param containerID - ID of container.
+   * @return - ContainerInfo such as creation state and the pipeline.
+   * @throws IOException
+   */
+  ContainerInfo getContainer(long containerID) throws IOException;
+
+  /**
+   * Returns the ContainerInfo from the container ID.
+   *
+   * @param containerID - ID of container.
+   * @return - ContainerWithPipeline such as creation state and the pipeline.
+   * @throws IOException
+   */
+  ContainerWithPipeline getContainerWithPipeline(long containerID)
+      throws IOException;
+
+  /**
+   * Returns containers under certain conditions.
+   * Search container IDs from start ID(exclusive),
+   * The max size of the searching range cannot exceed the
+   * value of count.
+   *
+   * @param startContainerID start containerID, >=0,
+   * start searching at the head if 0.
+   * @param count count must be >= 0
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big.
+   *
+   * @return a list of container.
+   * @throws IOException
+   */
+  List<ContainerInfo> listContainer(long startContainerID, int count)
+      throws IOException;
+
+  /**
+   * Allocates a new container for a given keyName and replication factor.
+   *
+   * @param replicationFactor - replication factor of the container.
+   * @param owner
+   * @return - ContainerWithPipeline.
+   * @throws IOException
+   */
+  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor replicationFactor, String owner)
+      throws IOException;
+
+  /**
+   * Deletes a container from SCM.
+   *
+   * @param containerID - Container ID
+   * @throws IOException
+   */
+  void deleteContainer(long containerID) throws IOException;
+
+  /**
+   * Update container state.
+   * @param containerID - Container ID
+   * @param event - container life cycle event
+   * @return - new container state
+   * @throws IOException
+   */
+  HddsProtos.LifeCycleState updateContainerState(long containerID,
+      HddsProtos.LifeCycleEvent event) throws IOException;
+
+  /**
+   * Returns the container State Manager.
+   * @return ContainerStateManager
+   */
+  ContainerStateManager getStateManager();
+
+  /**
+   * Process container report from Datanode.
+   *
+   * @param reports Container report
+   */
+  void processContainerReports(DatanodeDetails datanodeDetails,
+      ContainerReportsProto reports, boolean isRegisterCall)
+      throws IOException;
+
+  /**
+   * Update deleteTransactionId according to deleteTransactionMap.
+   *
+   * @param deleteTransactionMap Maps the containerId to latest delete
+   *                             transaction id for the container.
+   * @throws IOException
+   */
+  void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
+      throws IOException;
+
+  /**
+   * Returns the ContainerWithPipeline.
+   * @return NodeManager
+   */
+  ContainerWithPipeline getMatchingContainerWithPipeline(long size,
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) throws IOException;
+
+  PipelineSelector getPipelineSelector();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
deleted file mode 100644
index 71e17e9..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ /dev/null
@@ -1,699 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
-import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This
- * is used by SCM when
- * allocating new locations and when looking up a key.
- */
-public class ContainerMapping implements Mapping {
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
-      .class);
-
-  private final NodeManager nodeManager;
-  private final long cacheSize;
-  private final Lock lock;
-  private final Charset encoding = Charset.forName("UTF-8");
-  private final MetadataStore containerStore;
-  private final PipelineSelector pipelineSelector;
-  private final ContainerStateManager containerStateManager;
-  private final LeaseManager<ContainerInfo> containerLeaseManager;
-  private final EventPublisher eventPublisher;
-  private final long size;
-
-  /**
-   * Constructs a mapping class that creates mapping between container names
-   * and pipelines.
-   *
-   * @param nodeManager - NodeManager so that we can get the nodes that are
-   * healthy to place new
-   * containers.
-   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
-   * its nodes. This is
-   * passed to LevelDB and this memory is allocated in Native code space.
-   * CacheSize is specified
-   * in MB.
-   * @throws IOException on Failure.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerMapping(
-      final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB, EventPublisher eventPublisher) throws IOException {
-    this.nodeManager = nodeManager;
-    this.cacheSize = cacheSizeMB;
-
-    File metaDir = getOzoneMetaDirPath(conf);
-
-    // Write the container name to pipeline mapping.
-    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
-    containerStore =
-        MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(this.cacheSize * OzoneConsts.MB)
-            .build();
-
-    this.lock = new ReentrantLock();
-
-    size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
-        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
-    this.pipelineSelector = new PipelineSelector(nodeManager,
-            conf, eventPublisher, cacheSizeMB);
-
-    this.containerStateManager =
-        new ContainerStateManager(conf, this, pipelineSelector);
-    LOG.trace("Container State Manager created.");
-
-    this.eventPublisher = eventPublisher;
-
-    long containerCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    containerLeaseManager = new LeaseManager<>("ContainerCreation",
-        containerCreationLeaseTimeout);
-    containerLeaseManager.start();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public ContainerInfo getContainer(final long containerID) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      containerInfo = ContainerInfo.fromProtobuf(temp);
-      return containerInfo;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the ContainerInfo and pipeline from the containerID. If container
-   * has no available replicas in datanodes it returns pipeline with no
-   * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
-   * an empty pipeline.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  @Override
-  public ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException {
-    ContainerInfo contInfo;
-    lock.lock();
-    try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      contInfo = ContainerInfo.fromProtobuf(temp);
-
-      Pipeline pipeline;
-      String leaderId = "";
-      if (contInfo.isContainerOpen()) {
-        // If pipeline with given pipeline Id already exist return it
-        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
-      } else {
-        // For close containers create pipeline from datanodes with replicas
-        Set<DatanodeDetails> dnWithReplicas = containerStateManager
-            .getContainerReplicas(contInfo.containerID());
-        if (!dnWithReplicas.isEmpty()) {
-          leaderId = dnWithReplicas.iterator().next().getUuidString();
-        }
-        pipeline = new Pipeline(leaderId, contInfo.getState(),
-            ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
-            PipelineID.randomId());
-        dnWithReplicas.forEach(pipeline::addMember);
-      }
-      return new ContainerWithPipeline(contInfo, pipeline);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(long startContainerID,
-      int count) throws IOException {
-    List<ContainerInfo> containerList = new ArrayList<>();
-    lock.lock();
-    try {
-      if (containerStore.isEmpty()) {
-        throw new IOException("No container exists in current db");
-      }
-      byte[] startKey = startContainerID <= 0 ? null :
-          Longs.toByteArray(startContainerID);
-      List<Map.Entry<byte[], byte[]>> range =
-          containerStore.getSequentialRangeKVs(startKey, count, null);
-
-      // Transform the values into the pipelines.
-      // TODO: filter by container state
-      for (Map.Entry<byte[], byte[]> entry : range) {
-        ContainerInfo containerInfo =
-            ContainerInfo.fromProtobuf(
-                HddsProtos.SCMContainerInfo.PARSER.parseFrom(
-                    entry.getValue()));
-        Preconditions.checkNotNull(containerInfo);
-        containerList.add(containerInfo);
-      }
-    } finally {
-      lock.unlock();
-    }
-    return containerList;
-  }
-
-  /**
-   * Allocates a new container.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param owner - The string name of the Service that owns this container.
-   * @return - Pipeline that makes up this container.
-   * @throws IOException - Exception
-   */
-  @Override
-  public ContainerWithPipeline allocateContainer(
-      ReplicationType type,
-      ReplicationFactor replicationFactor,
-      String owner)
-      throws IOException {
-
-    ContainerInfo containerInfo;
-    ContainerWithPipeline containerWithPipeline;
-
-    lock.lock();
-    try {
-      containerWithPipeline = containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, owner);
-      containerInfo = containerWithPipeline.getContainerInfo();
-
-      byte[] containerIDBytes = Longs.toByteArray(
-          containerInfo.getContainerID());
-      containerStore.put(containerIDBytes, containerInfo.getProtobuf()
-              .toByteArray());
-    } finally {
-      lock.unlock();
-    }
-    return containerWithPipeline;
-  }
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerID - Container ID
-   * @throws IOException if container doesn't exist or container store failed
-   *                     to delete the
-   *                     specified key.
-   */
-  @Override
-  public void deleteContainer(long containerID) throws IOException {
-    lock.lock();
-    try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to delete container " + containerID + ", reason : " +
-                "container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerStore.delete(dbKey);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc} Used by client to update container state on SCM.
-   */
-  @Override
-  public HddsProtos.LifeCycleState updateContainerState(
-      long containerID, HddsProtos.LifeCycleEvent event) throws
-      IOException {
-    ContainerInfo containerInfo;
-    lock.lock();
-    try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to update container state"
-                + containerID
-                + ", reason : container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      containerInfo =
-          ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
-              .parseFrom(containerBytes));
-
-      Preconditions.checkNotNull(containerInfo);
-      switch (event) {
-      case CREATE:
-        // Acquire lease on container
-        Lease<ContainerInfo> containerLease =
-            containerLeaseManager.acquire(containerInfo);
-        // Register callback to be executed in case of timeout
-        containerLease.registerCallBack(() -> {
-          updateContainerState(containerID,
-              HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on container
-        containerLeaseManager.release(containerInfo);
-        break;
-      case FINALIZE:
-        // TODO: we don't need a lease manager here for closing as the
-        // container report will include the container state after HDFS-13008
-        // If a client failed to update the container close state, DN container
-        // report from 3 DNs will be used to close the container eventually.
-        break;
-      case CLOSE:
-        break;
-      case UPDATE:
-        break;
-      case DELETE:
-        break;
-      case TIMEOUT:
-        break;
-      case CLEANUP:
-        break;
-      default:
-        throw new SCMException("Unsupported container LifeCycleEvent.",
-            FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-      // If the below updateContainerState call fails, we should revert the
-      // changes made in switch case.
-      // Like releasing the lease in case of BEGIN_CREATE.
-      ContainerInfo updatedContainer = containerStateManager
-          .updateContainerState(containerInfo, event);
-      if (!updatedContainer.isContainerOpen()) {
-        pipelineSelector.removeContainerFromPipeline(
-                containerInfo.getPipelineID(), containerID);
-      }
-      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
-      return updatedContainer.getState();
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
-  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
-      throws IOException {
-    if (deleteTransactionMap == null) {
-      return;
-    }
-
-    lock.lock();
-    try {
-      BatchOperation batch = new BatchOperation();
-      for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
-        long containerID = entry.getKey();
-        byte[] dbKey = Longs.toByteArray(containerID);
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes == null) {
-          throw new SCMException(
-              "Failed to increment number of deleted blocks for container "
-                  + containerID + ", reason : " + "container doesn't exist.",
-              SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-        }
-        ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
-            HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
-        containerInfo.updateDeleteTransactionId(entry.getValue());
-        batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
-      }
-      containerStore.writeBatch(batch);
-      containerStateManager
-          .updateDeleteTransactionId(deleteTransactionMap);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the container State Manager.
-   *
-   * @return ContainerStateManager
-   */
-  @Override
-  public ContainerStateManager getStateManager() {
-    return containerStateManager;
-  }
-
-  /**
-   * Return a container matching the attributes specified.
-   *
-   * @param sizeRequired - Space needed in the Container.
-   * @param owner - Owner of the container - A specific nameservice.
-   * @param type - Replication Type {StandAlone, Ratis}
-   * @param factor - Replication Factor {ONE, THREE}
-   * @param state - State of the Container-- {Open, Allocated etc.}
-   * @return ContainerInfo, null if there is no match found.
-   */
-  public ContainerWithPipeline getMatchingContainerWithPipeline(
-      final long sizeRequired, String owner, ReplicationType type,
-      ReplicationFactor factor, LifeCycleState state) throws IOException {
-    ContainerInfo containerInfo = getStateManager()
-        .getMatchingContainer(sizeRequired, owner, type, factor, state);
-    if (containerInfo == null) {
-      return null;
-    }
-    Pipeline pipeline = pipelineSelector
-        .getPipeline(containerInfo.getPipelineID());
-    return new ContainerWithPipeline(containerInfo, pipeline);
-  }
-
-  /**
-   * Process container report from Datanode.
-   * <p>
-   * Processing follows a very simple logic for time being.
-   * <p>
-   * 1. Datanodes report the current State -- denoted by the datanodeState
-   * <p>
-   * 2. We are the older SCM state from the Database -- denoted by
-   * the knownState.
-   * <p>
-   * 3. We copy the usage etc. from currentState to newState and log that
-   * newState to the DB. This allows us SCM to bootup again and read the
-   * state of the world from the DB, and then reconcile the state from
-   * container reports, when they arrive.
-   *
-   * @param reports Container report
-   */
-  @Override
-  public void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException {
-    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-        containerInfos = reports.getReportsList();
-    PendingDeleteStatusList pendingDeleteStatusList =
-        new PendingDeleteStatusList(datanodeDetails);
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
-        containerInfos) {
-      // Update replica info during registration process.
-      if (isRegisterCall) {
-        try {
-          getStateManager().addContainerReplica(ContainerID.
-              valueof(contInfo.getContainerID()), datanodeDetails);
-        } catch (Exception ex) {
-          // Continue to next one after logging the error.
-          LOG.error("Error while adding replica for containerId {}.",
-              contInfo.getContainerID(), ex);
-        }
-      }
-      byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
-      lock.lock();
-      try {
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes != null) {
-          HddsProtos.SCMContainerInfo knownState =
-              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-
-          if (knownState.getState() == LifeCycleState.CLOSING
-              && contInfo.getState() == LifeCycleState.CLOSED) {
-
-            updateContainerState(contInfo.getContainerID(),
-                LifeCycleEvent.CLOSE);
-
-            //reread the container
-            knownState =
-                HddsProtos.SCMContainerInfo.PARSER
-                    .parseFrom(containerStore.get(dbKey));
-          }
-
-          HddsProtos.SCMContainerInfo newState =
-              reconcileState(contInfo, knownState, datanodeDetails);
-
-          if (knownState.getDeleteTransactionId() > contInfo
-              .getDeleteTransactionId()) {
-            pendingDeleteStatusList
-                .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
-                    knownState.getDeleteTransactionId(),
-                    knownState.getContainerID());
-          }
-
-          // FIX ME: This can be optimized, we write twice to memory, where a
-          // single write would work well.
-          //
-          // We need to write this to DB again since the closed only write
-          // the updated State.
-          containerStore.put(dbKey, newState.toByteArray());
-
-        } else {
-          // Container not found in our container db.
-          LOG.error("Error while processing container report from datanode :" +
-                  " {}, for container: {}, reason: container doesn't exist in" 
+
-                  "container database.", datanodeDetails,
-              contInfo.getContainerID());
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
-      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
-          pendingDeleteStatusList);
-    }
-
-  }
-
-  /**
-   * Reconciles the state from Datanode with the state in SCM.
-   *
-   * @param datanodeState - State from the Datanode.
-   * @param knownState - State inside SCM.
-   * @param dnDetails
-   * @return new SCM State for this container.
-   */
-  private HddsProtos.SCMContainerInfo reconcileState(
-      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
-      SCMContainerInfo knownState, DatanodeDetails dnDetails) {
-    HddsProtos.SCMContainerInfo.Builder builder =
-        HddsProtos.SCMContainerInfo.newBuilder();
-    builder.setContainerID(knownState.getContainerID())
-        .setPipelineID(knownState.getPipelineID())
-        .setReplicationType(knownState.getReplicationType())
-        .setReplicationFactor(knownState.getReplicationFactor());
-
-    // TODO: If current state doesn't have this DN in list of DataNodes with
-    // replica then add it in list of replicas.
-
-    // If used size is greater than allocated size, we will be updating
-    // allocated size with used size. This update is done as a fallback
-    // mechanism in case SCM crashes without properly updating allocated
-    // size. Correct allocated value will be updated by
-    // ContainerStateManager during SCM shutdown.
-    long usedSize = datanodeState.getUsed();
-    long allocated = knownState.getAllocatedBytes() > usedSize ?
-        knownState.getAllocatedBytes() : usedSize;
-    builder.setAllocatedBytes(allocated)
-        .setUsedBytes(usedSize)
-        .setNumberOfKeys(datanodeState.getKeyCount())
-        .setState(knownState.getState())
-        .setStateEnterTime(knownState.getStateEnterTime())
-        .setContainerID(knownState.getContainerID())
-        .setDeleteTransactionId(knownState.getDeleteTransactionId());
-    if (knownState.getOwner() != null) {
-      builder.setOwner(knownState.getOwner());
-    }
-    return builder.build();
-  }
-
-
-  /**
-   * In Container is in closed state, if it is in closed, Deleting or Deleted
-   * State.
-   *
-   * @param info - ContainerInfo.
-   * @return true if is in open state, false otherwise
-   */
-  private boolean shouldClose(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.OPEN;
-  }
-
-  private boolean isClosed(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.CLOSED;
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated with it.
-   * If the stream is
-   * already closed then invoking this method has no effect.
-   * <p>
-   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
-   * fail require careful
-   * attention. It is strongly advised to relinquish the underlying resources
-   * and to internally
-   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
-   * {@code IOException}.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    if (containerLeaseManager != null) {
-      containerLeaseManager.shutdown();
-    }
-    if (containerStateManager != null) {
-      flushContainerInfo();
-      containerStateManager.close();
-    }
-    if (containerStore != null) {
-      containerStore.close();
-    }
-
-    if (pipelineSelector != null) {
-      pipelineSelector.shutdown();
-    }
-  }
-
-  /**
-   * Since allocatedBytes of a container is only in memory, stored in
-   * containerStateManager, when closing ContainerMapping, we need to update
-   * this in the container store.
-   *
-   * @throws IOException on failure.
-   */
-  @VisibleForTesting
-  public void flushContainerInfo() throws IOException {
-    List<ContainerInfo> containers = containerStateManager.getAllContainers();
-    List<Long> failedContainers = new ArrayList<>();
-    for (ContainerInfo info : containers) {
-      // even if some container updated failed, others can still proceed
-      try {
-        byte[] dbKey = Longs.toByteArray(info.getContainerID());
-        byte[] containerBytes = containerStore.get(dbKey);
-        // TODO : looks like when a container is deleted, the container is
-        // removed from containerStore but not containerStateManager, so it can
-        // return info of a deleted container. may revisit this in the future,
-        // for now, just skip a not-found container
-        if (containerBytes != null) {
-          containerStore.put(dbKey, info.getProtobuf().toByteArray());
-        } else {
-          LOG.debug("Container state manager has container {} but not found " +
-                  "in container store, a deleted container?",
-              info.getContainerID());
-        }
-      } catch (IOException ioe) {
-        failedContainers.add(info.getContainerID());
-      }
-    }
-    if (!failedContainers.isEmpty()) {
-      throw new IOException("Error in flushing container info from container " 
+
-          "state manager: " + failedContainers);
-    }
-  }
-
-  @VisibleForTesting
-  public MetadataStore getContainerStore() {
-    return containerStore;
-  }
-
-  public PipelineSelector getPipelineSelector() {
-    return pipelineSelector;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 71935f0..0f824a0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -50,21 +50,21 @@ public class ContainerReportHandler implements
 
   private final NodeManager nodeManager;
 
-  private final Mapping containerMapping;
+  private final ContainerManager containerManager;
 
   private ContainerStateManager containerStateManager;
 
   private ReplicationActivityStatus replicationStatus;
 
-  public ContainerReportHandler(Mapping containerMapping,
+  public ContainerReportHandler(ContainerManager containerManager,
       NodeManager nodeManager,
       ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(containerMapping);
+    Preconditions.checkNotNull(containerManager);
     Preconditions.checkNotNull(nodeManager);
     Preconditions.checkNotNull(replicationActivityStatus);
-    this.containerStateManager = containerMapping.getStateManager();
+    this.containerStateManager = containerManager.getStateManager();
     this.nodeManager = nodeManager;
-    this.containerMapping = containerMapping;
+    this.containerManager = containerManager;
     this.replicationStatus = replicationActivityStatus;
   }
 
@@ -80,7 +80,7 @@ public class ContainerReportHandler implements
     try {
 
       //update state in container db and trigger close container events
-      containerMapping
+      containerManager
           .processContainerReports(datanodeOrigin, containerReport, false);
 
       Set<ContainerID> containerIds = containerReport.getReportsList().stream()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 930c098..b8e4e89 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -136,7 +136,7 @@ public class ContainerStateManager implements Closeable {
    */
   @SuppressWarnings("unchecked")
   public ContainerStateManager(Configuration configuration,
-      Mapping containerMapping, PipelineSelector pipelineSelector) {
+      ContainerManager containerManager, PipelineSelector pipelineSelector) {
 
     // Initialize the container state machine.
     Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
@@ -158,15 +158,15 @@ public class ContainerStateManager implements Closeable {
     lastUsedMap = new ConcurrentHashMap<>();
     containerCount = new AtomicLong(0);
     containers = new ContainerStateMap();
-    loadExistingContainers(containerMapping, pipelineSelector);
+    loadExistingContainers(containerManager, pipelineSelector);
   }
 
-  private void loadExistingContainers(Mapping containerMapping,
+  private void loadExistingContainers(ContainerManager containerManager,
                                       PipelineSelector pipelineSelector) {
 
     List<ContainerInfo> containerList;
     try {
-      containerList = containerMapping.listContainer(0, Integer.MAX_VALUE);
+      containerList = containerManager.listContainer(0, Integer.MAX_VALUE);
 
       // if there are no container to load, let us return.
       if (containerList == null || containerList.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
deleted file mode 100644
index 5ed80cb..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Mapping class contains the mapping from a name to a pipeline mapping. This 
is
- * used by SCM when allocating new locations and when looking up a key.
- */
-public interface Mapping extends Closeable {
-  /**
-   * Returns the ContainerInfo from the container ID.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerInfo such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerInfo getContainer(long containerID) throws IOException;
-
-  /**
-   * Returns the ContainerInfo from the container ID.
-   *
-   * @param containerID - ID of container.
-   * @return - ContainerWithPipeline such as creation state and the pipeline.
-   * @throws IOException
-   */
-  ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException;
-
-  /**
-   * Returns containers under certain conditions.
-   * Search container IDs from start ID(exclusive),
-   * The max size of the searching range cannot exceed the
-   * value of count.
-   *
-   * @param startContainerID start containerID, >=0,
-   * start searching at the head if 0.
-   * @param count count must be >= 0
-   *              Usually the count will be replace with a very big
-   *              value instead of being unlimited in case the db is very big.
-   *
-   * @return a list of container.
-   * @throws IOException
-   */
-  List<ContainerInfo> listContainer(long startContainerID, int count)
-      throws IOException;
-
-  /**
-   * Allocates a new container for a given keyName and replication factor.
-   *
-   * @param replicationFactor - replication factor of the container.
-   * @param owner
-   * @return - ContainerWithPipeline.
-   * @throws IOException
-   */
-  ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor replicationFactor, String owner)
-      throws IOException;
-
-  /**
-   * Deletes a container from SCM.
-   *
-   * @param containerID - Container ID
-   * @throws IOException
-   */
-  void deleteContainer(long containerID) throws IOException;
-
-  /**
-   * Update container state.
-   * @param containerID - Container ID
-   * @param event - container life cycle event
-   * @return - new container state
-   * @throws IOException
-   */
-  HddsProtos.LifeCycleState updateContainerState(long containerID,
-      HddsProtos.LifeCycleEvent event) throws IOException;
-
-  /**
-   * Returns the container State Manager.
-   * @return ContainerStateManager
-   */
-  ContainerStateManager getStateManager();
-
-  /**
-   * Process container report from Datanode.
-   *
-   * @param reports Container report
-   */
-  void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException;
-
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
-  void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
-      throws IOException;
-
-  /**
-   * Returns the ContainerWithPipeline.
-   * @return NodeManager
-   */
-  ContainerWithPipeline getMatchingContainerWithPipeline(long size,
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) throws IOException;
-
-  PipelineSelector getPipelineSelector();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d328662/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
new file mode 100644
index 0000000..df26e36
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
+
+/**
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public class SCMContainerManager implements ContainerManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager
+      .class);
+
+  private final NodeManager nodeManager;
+  private final long cacheSize;
+  private final Lock lock;
+  private final Charset encoding = Charset.forName("UTF-8");
+  private final MetadataStore containerStore;
+  private final PipelineSelector pipelineSelector;
+  private final ContainerStateManager containerStateManager;
+  private final LeaseManager<ContainerInfo> containerLeaseManager;
+  private final EventPublisher eventPublisher;
+  private final long size;
+
+  /**
+   * Constructs a mapping class that creates mapping between container names
+   * and pipelines.
+   *
+   * @param nodeManager - NodeManager so that we can get the nodes that are
+   * healthy to place new
+   * containers.
+   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
+   * its nodes. This is
+   * passed to LevelDB and this memory is allocated in Native code space.
+   * CacheSize is specified
+   * in MB.
+   * @throws IOException on Failure.
+   */
+  @SuppressWarnings("unchecked")
+  public SCMContainerManager(
+      final Configuration conf, final NodeManager nodeManager, final int
+      cacheSizeMB, EventPublisher eventPublisher) throws IOException {
+    this.nodeManager = nodeManager;
+    this.cacheSize = cacheSizeMB;
+
+    File metaDir = getOzoneMetaDirPath(conf);
+
+    // Write the container name to pipeline mapping.
+    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
+    containerStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(containerDBPath)
+            .setCacheSize(this.cacheSize * OzoneConsts.MB)
+            .build();
+
+    this.lock = new ReentrantLock();
+
+    size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+    this.pipelineSelector = new PipelineSelector(nodeManager,
+            conf, eventPublisher, cacheSizeMB);
+
+    this.containerStateManager =
+        new ContainerStateManager(conf, this, pipelineSelector);
+    LOG.trace("Container State Manager created.");
+
+    this.eventPublisher = eventPublisher;
+
+    long containerCreationLeaseTimeout = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    containerLeaseManager = new LeaseManager<>("ContainerCreation",
+        containerCreationLeaseTimeout);
+    containerLeaseManager.start();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ContainerInfo getContainer(final long containerID) throws
+      IOException {
+    ContainerInfo containerInfo;
+    lock.lock();
+    try {
+      byte[] containerBytes = containerStore.get(
+          Longs.toByteArray(containerID));
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Specified key does not exist. key : " + containerID,
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+
+      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+          .parseFrom(containerBytes);
+      containerInfo = ContainerInfo.fromProtobuf(temp);
+      return containerInfo;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the ContainerInfo and pipeline from the containerID. If container
+   * has no available replicas in datanodes it returns pipeline with no
+   * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
+   * an empty pipeline.
+   *
+   * @param containerID - ID of container.
+   * @return - ContainerWithPipeline such as creation state and the pipeline.
+   * @throws IOException
+   */
+  @Override
+  public ContainerWithPipeline getContainerWithPipeline(long containerID)
+      throws IOException {
+    ContainerInfo contInfo;
+    lock.lock();
+    try {
+      byte[] containerBytes = containerStore.get(
+          Longs.toByteArray(containerID));
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Specified key does not exist. key : " + containerID,
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
+          .parseFrom(containerBytes);
+      contInfo = ContainerInfo.fromProtobuf(temp);
+
+      Pipeline pipeline;
+      String leaderId = "";
+      if (contInfo.isContainerOpen()) {
+        // If pipeline with given pipeline Id already exist return it
+        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
+      } else {
+        // For close containers create pipeline from datanodes with replicas
+        Set<DatanodeDetails> dnWithReplicas = containerStateManager
+            .getContainerReplicas(contInfo.containerID());
+        if (!dnWithReplicas.isEmpty()) {
+          leaderId = dnWithReplicas.iterator().next().getUuidString();
+        }
+        pipeline = new Pipeline(leaderId, contInfo.getState(),
+            ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
+            PipelineID.randomId());
+        dnWithReplicas.forEach(pipeline::addMember);
+      }
+      return new ContainerWithPipeline(contInfo, pipeline);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<ContainerInfo> listContainer(long startContainerID,
+      int count) throws IOException {
+    List<ContainerInfo> containerList = new ArrayList<>();
+    lock.lock();
+    try {
+      if (containerStore.isEmpty()) {
+        throw new IOException("No container exists in current db");
+      }
+      byte[] startKey = startContainerID <= 0 ? null :
+          Longs.toByteArray(startContainerID);
+      List<Map.Entry<byte[], byte[]>> range =
+          containerStore.getSequentialRangeKVs(startKey, count, null);
+
+      // Transform the values into the pipelines.
+      // TODO: filter by container state
+      for (Map.Entry<byte[], byte[]> entry : range) {
+        ContainerInfo containerInfo =
+            ContainerInfo.fromProtobuf(
+                HddsProtos.SCMContainerInfo.PARSER.parseFrom(
+                    entry.getValue()));
+        Preconditions.checkNotNull(containerInfo);
+        containerList.add(containerInfo);
+      }
+    } finally {
+      lock.unlock();
+    }
+    return containerList;
+  }
+
+  /**
+   * Allocates a new container.
+   *
+   * @param replicationFactor - replication factor of the container.
+   * @param owner - The string name of the Service that owns this container.
+   * @return - Pipeline that makes up this container.
+   * @throws IOException - Exception
+   */
+  @Override
+  public ContainerWithPipeline allocateContainer(
+      ReplicationType type,
+      ReplicationFactor replicationFactor,
+      String owner)
+      throws IOException {
+
+    ContainerInfo containerInfo;
+    ContainerWithPipeline containerWithPipeline;
+
+    lock.lock();
+    try {
+      containerWithPipeline = containerStateManager.allocateContainer(
+              pipelineSelector, type, replicationFactor, owner);
+      containerInfo = containerWithPipeline.getContainerInfo();
+
+      byte[] containerIDBytes = Longs.toByteArray(
+          containerInfo.getContainerID());
+      containerStore.put(containerIDBytes, containerInfo.getProtobuf()
+              .toByteArray());
+    } finally {
+      lock.unlock();
+    }
+    return containerWithPipeline;
+  }
+
+  /**
+   * Deletes a container from SCM.
+   *
+   * @param containerID - Container ID
+   * @throws IOException if container doesn't exist or container store failed
+   *                     to delete the
+   *                     specified key.
+   */
+  @Override
+  public void deleteContainer(long containerID) throws IOException {
+    lock.lock();
+    try {
+      byte[] dbKey = Longs.toByteArray(containerID);
+      byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Failed to delete container " + containerID + ", reason : " +
+                "container doesn't exist.",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      containerStore.delete(dbKey);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc} Used by client to update container state on SCM.
+   */
+  @Override
+  public HddsProtos.LifeCycleState updateContainerState(
+      long containerID, HddsProtos.LifeCycleEvent event) throws
+      IOException {
+    ContainerInfo containerInfo;
+    lock.lock();
+    try {
+      byte[] dbKey = Longs.toByteArray(containerID);
+      byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Failed to update container state"
+                + containerID
+                + ", reason : container doesn't exist.",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      containerInfo =
+          ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
+              .parseFrom(containerBytes));
+
+      Preconditions.checkNotNull(containerInfo);
+      switch (event) {
+      case CREATE:
+        // Acquire lease on container
+        Lease<ContainerInfo> containerLease =
+            containerLeaseManager.acquire(containerInfo);
+        // Register callback to be executed in case of timeout
+        containerLease.registerCallBack(() -> {
+          updateContainerState(containerID,
+              HddsProtos.LifeCycleEvent.TIMEOUT);
+          return null;
+        });
+        break;
+      case CREATED:
+        // Release the lease on container
+        containerLeaseManager.release(containerInfo);
+        break;
+      case FINALIZE:
+        // TODO: we don't need a lease manager here for closing as the
+        // container report will include the container state after HDFS-13008
+        // If a client failed to update the container close state, DN container
+        // report from 3 DNs will be used to close the container eventually.
+        break;
+      case CLOSE:
+        break;
+      case UPDATE:
+        break;
+      case DELETE:
+        break;
+      case TIMEOUT:
+        break;
+      case CLEANUP:
+        break;
+      default:
+        throw new SCMException("Unsupported container LifeCycleEvent.",
+            FAILED_TO_CHANGE_CONTAINER_STATE);
+      }
+      // If the below updateContainerState call fails, we should revert the
+      // changes made in switch case.
+      // Like releasing the lease in case of BEGIN_CREATE.
+      ContainerInfo updatedContainer = containerStateManager
+          .updateContainerState(containerInfo, event);
+      if (!updatedContainer.isContainerOpen()) {
+        pipelineSelector.removeContainerFromPipeline(
+                containerInfo.getPipelineID(), containerID);
+      }
+      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
+      return updatedContainer.getState();
+    } catch (LeaseException e) {
+      throw new IOException("Lease Exception.", e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Update deleteTransactionId according to deleteTransactionMap.
+   *
+   * @param deleteTransactionMap Maps the containerId to latest delete
+   *                             transaction id for the container.
+   * @throws IOException
+   */
+  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
+      throws IOException {
+    if (deleteTransactionMap == null) {
+      return;
+    }
+
+    lock.lock();
+    try {
+      BatchOperation batch = new BatchOperation();
+      for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
+        long containerID = entry.getKey();
+        byte[] dbKey = Longs.toByteArray(containerID);
+        byte[] containerBytes = containerStore.get(dbKey);
+        if (containerBytes == null) {
+          throw new SCMException(
+              "Failed to increment number of deleted blocks for container "
+                  + containerID + ", reason : " + "container doesn't exist.",
+              SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+        }
+        ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
+            HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
+        containerInfo.updateDeleteTransactionId(entry.getValue());
+        batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
+      }
+      containerStore.writeBatch(batch);
+      containerStateManager
+          .updateDeleteTransactionId(deleteTransactionMap);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the container State Manager.
+   *
+   * @return ContainerStateManager
+   */
+  @Override
+  public ContainerStateManager getStateManager() {
+    return containerStateManager;
+  }
+
+  /**
+   * Return a container matching the attributes specified.
+   *
+   * @param sizeRequired - Space needed in the Container.
+   * @param owner - Owner of the container - A specific nameservice.
+   * @param type - Replication Type {StandAlone, Ratis}
+   * @param factor - Replication Factor {ONE, THREE}
+   * @param state - State of the Container-- {Open, Allocated etc.}
+   * @return ContainerInfo, null if there is no match found.
+   */
+  public ContainerWithPipeline getMatchingContainerWithPipeline(
+      final long sizeRequired, String owner, ReplicationType type,
+      ReplicationFactor factor, LifeCycleState state) throws IOException {
+    ContainerInfo containerInfo = getStateManager()
+        .getMatchingContainer(sizeRequired, owner, type, factor, state);
+    if (containerInfo == null) {
+      return null;
+    }
+    Pipeline pipeline = pipelineSelector
+        .getPipeline(containerInfo.getPipelineID());
+    return new ContainerWithPipeline(containerInfo, pipeline);
+  }
+
+  /**
+   * Process container report from Datanode.
+   * <p>
+   * Processing follows a very simple logic for time being.
+   * <p>
+   * 1. Datanodes report the current State -- denoted by the datanodeState
+   * <p>
+   * 2. We are the older SCM state from the Database -- denoted by
+   * the knownState.
+   * <p>
+   * 3. We copy the usage etc. from currentState to newState and log that
+   * newState to the DB. This allows us SCM to bootup again and read the
+   * state of the world from the DB, and then reconcile the state from
+   * container reports, when they arrive.
+   *
+   * @param reports Container report
+   */
+  @Override
+  public void processContainerReports(DatanodeDetails datanodeDetails,
+      ContainerReportsProto reports, boolean isRegisterCall)
+      throws IOException {
+    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
+        containerInfos = reports.getReportsList();
+    PendingDeleteStatusList pendingDeleteStatusList =
+        new PendingDeleteStatusList(datanodeDetails);
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
+        containerInfos) {
+      // Update replica info during registration process.
+      if (isRegisterCall) {
+        try {
+          getStateManager().addContainerReplica(ContainerID.
+              valueof(contInfo.getContainerID()), datanodeDetails);
+        } catch (Exception ex) {
+          // Continue to next one after logging the error.
+          LOG.error("Error while adding replica for containerId {}.",
+              contInfo.getContainerID(), ex);
+        }
+      }
+      byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
+      lock.lock();
+      try {
+        byte[] containerBytes = containerStore.get(dbKey);
+        if (containerBytes != null) {
+          HddsProtos.SCMContainerInfo knownState =
+              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
+
+          if (knownState.getState() == LifeCycleState.CLOSING
+              && contInfo.getState() == LifeCycleState.CLOSED) {
+
+            updateContainerState(contInfo.getContainerID(),
+                LifeCycleEvent.CLOSE);
+
+            //reread the container
+            knownState =
+                HddsProtos.SCMContainerInfo.PARSER
+                    .parseFrom(containerStore.get(dbKey));
+          }
+
+          HddsProtos.SCMContainerInfo newState =
+              reconcileState(contInfo, knownState, datanodeDetails);
+
+          if (knownState.getDeleteTransactionId() > contInfo
+              .getDeleteTransactionId()) {
+            pendingDeleteStatusList
+                .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
+                    knownState.getDeleteTransactionId(),
+                    knownState.getContainerID());
+          }
+
+          // FIX ME: This can be optimized, we write twice to memory, where a
+          // single write would work well.
+          //
+          // We need to write this to DB again since the closed only write
+          // the updated State.
+          containerStore.put(dbKey, newState.toByteArray());
+
+        } else {
+          // Container not found in our container db.
+          LOG.error("Error while processing container report from datanode :" +
+                  " {}, for container: {}, reason: container doesn't exist in" 
+
+                  "container database.", datanodeDetails,
+              contInfo.getContainerID());
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+          pendingDeleteStatusList);
+    }
+
+  }
+
+  /**
+   * Reconciles the state from Datanode with the state in SCM.
+   *
+   * @param datanodeState - State from the Datanode.
+   * @param knownState - State inside SCM.
+   * @param dnDetails
+   * @return new SCM State for this container.
+   */
+  private HddsProtos.SCMContainerInfo reconcileState(
+      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
+      SCMContainerInfo knownState, DatanodeDetails dnDetails) {
+    HddsProtos.SCMContainerInfo.Builder builder =
+        HddsProtos.SCMContainerInfo.newBuilder();
+    builder.setContainerID(knownState.getContainerID())
+        .setPipelineID(knownState.getPipelineID())
+        .setReplicationType(knownState.getReplicationType())
+        .setReplicationFactor(knownState.getReplicationFactor());
+
+    // TODO: If current state doesn't have this DN in list of DataNodes with
+    // replica then add it in list of replicas.
+
+    // If used size is greater than allocated size, we will be updating
+    // allocated size with used size. This update is done as a fallback
+    // mechanism in case SCM crashes without properly updating allocated
+    // size. Correct allocated value will be updated by
+    // ContainerStateManager during SCM shutdown.
+    long usedSize = datanodeState.getUsed();
+    long allocated = knownState.getAllocatedBytes() > usedSize ?
+        knownState.getAllocatedBytes() : usedSize;
+    builder.setAllocatedBytes(allocated)
+        .setUsedBytes(usedSize)
+        .setNumberOfKeys(datanodeState.getKeyCount())
+        .setState(knownState.getState())
+        .setStateEnterTime(knownState.getStateEnterTime())
+        .setContainerID(knownState.getContainerID())
+        .setDeleteTransactionId(knownState.getDeleteTransactionId());
+    if (knownState.getOwner() != null) {
+      builder.setOwner(knownState.getOwner());
+    }
+    return builder.build();
+  }
+
+
+  /**
+   * In Container is in closed state, if it is in closed, Deleting or Deleted
+   * State.
+   *
+   * @param info - ContainerInfo.
+   * @return true if is in open state, false otherwise
+   */
+  private boolean shouldClose(ContainerInfo info) {
+    return info.getState() == HddsProtos.LifeCycleState.OPEN;
+  }
+
+  private boolean isClosed(ContainerInfo info) {
+    return info.getState() == HddsProtos.LifeCycleState.CLOSED;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it.
+   * If the stream is
+   * already closed then invoking this method has no effect.
+   * <p>
+   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful
+   * attention. It is strongly advised to relinquish the underlying resources
+   * and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
+   * {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    if (containerLeaseManager != null) {
+      containerLeaseManager.shutdown();
+    }
+    if (containerStateManager != null) {
+      flushContainerInfo();
+      containerStateManager.close();
+    }
+    if (containerStore != null) {
+      containerStore.close();
+    }
+
+    if (pipelineSelector != null) {
+      pipelineSelector.shutdown();
+    }
+  }
+
+  /**
+   * Since allocatedBytes of a container is only in memory, stored in
+   * containerStateManager, when closing SCMContainerManager, we need to update
+   * this in the container store.
+   *
+   * @throws IOException on failure.
+   */
+  @VisibleForTesting
+  public void flushContainerInfo() throws IOException {
+    List<ContainerInfo> containers = containerStateManager.getAllContainers();
+    List<Long> failedContainers = new ArrayList<>();
+    for (ContainerInfo info : containers) {
+      // even if some container updated failed, others can still proceed
+      try {
+        byte[] dbKey = Longs.toByteArray(info.getContainerID());
+        byte[] containerBytes = containerStore.get(dbKey);
+        // TODO : looks like when a container is deleted, the container is
+        // removed from containerStore but not containerStateManager, so it can
+        // return info of a deleted container. may revisit this in the future,
+        // for now, just skip a not-found container
+        if (containerBytes != null) {
+          containerStore.put(dbKey, info.getProtobuf().toByteArray());
+        } else {
+          LOG.debug("Container state manager has container {} but not found " +
+                  "in container store, a deleted container?",
+              info.getContainerID());
+        }
+      } catch (IOException ioe) {
+        failedContainers.add(info.getContainerID());
+      }
+    }
+    if (!failedContainers.isEmpty()) {
+      throw new IOException("Error in flushing container info from container " 
+
+          "state manager: " + failedContainers);
+    }
+  }
+
+  @VisibleForTesting
+  public MetadataStore getContainerStore() {
+    return containerStore;
+  }
+
+  public PipelineSelector getPipelineSelector() {
+    return pipelineSelector;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to