http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
deleted file mode 100644
index c44a08c..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-/**
- * This exception represents that the node that is being accessed does not
- * exist in NodeStateMap.
- */
-public class NodeNotFoundException extends NodeException {
-
-
-  /**
-   * Constructs an {@code NodeNotFoundException} with {@code null}
-   * as its error detail message.
-   */
-  public NodeNotFoundException() {
-    super();
-  }
-
-  /**
-   * Constructs an {@code NodeNotFoundException} with the specified
-   * detail message.
-   *
-   * @param message
-   *        The detail message (which is saved for later retrieval
-   *        by the {@link #getMessage()} method)
-   */
-  public NodeNotFoundException(String message) {
-    super(message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
deleted file mode 100644
index 774ced1..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Maintains the state of datanodes in SCM. This class should only be used by
- * NodeStateManager to maintain the state. If anyone wants to change the
- * state of a node they should call NodeStateManager, do not directly use
- * this class.
- */
-public class NodeStateMap {
-
-  /**
-   * Node id to node info map.
-   */
-  private final ConcurrentHashMap<UUID, DatanodeInfo> nodeMap;
-  /**
-   * Represents the current state of node.
-   */
-  private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
-  /**
-   * Represents the current stats of node.
-   */
-  private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
-
-  private final ReadWriteLock lock;
-
-  /**
-   * Creates a new instance of NodeStateMap with no nodes.
-   */
-  public NodeStateMap() {
-    lock = new ReentrantReadWriteLock();
-    nodeMap = new ConcurrentHashMap<>();
-    stateMap = new ConcurrentHashMap<>();
-    nodeStats = new ConcurrentHashMap<>();
-    initStateMap();
-  }
-
-  /**
-   * Initializes the state map with available states.
-   */
-  private void initStateMap() {
-    for (NodeState state : NodeState.values()) {
-      stateMap.put(state, new HashSet<>());
-    }
-  }
-
-  /**
-   * Adds a node to NodeStateMap.
-   *
-   * @param datanodeDetails DatanodeDetails
-   * @param nodeState initial NodeState
-   *
-   * @throws NodeAlreadyExistsException if the node already exist
-   */
-  public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState)
-      throws NodeAlreadyExistsException {
-    lock.writeLock().lock();
-    try {
-      UUID id = datanodeDetails.getUuid();
-      if (nodeMap.containsKey(id)) {
-        throw new NodeAlreadyExistsException("Node UUID: " + id);
-      }
-      nodeMap.put(id, new DatanodeInfo(datanodeDetails));
-      stateMap.get(nodeState).add(id);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Updates the node state.
-   *
-   * @param nodeId Node Id
-   * @param currentState current state
-   * @param newState new state
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public void updateNodeState(UUID nodeId, NodeState currentState,
-                              NodeState newState)throws NodeNotFoundException {
-    lock.writeLock().lock();
-    try {
-      if (stateMap.get(currentState).remove(nodeId)) {
-        stateMap.get(newState).add(nodeId);
-      } else {
-        throw new NodeNotFoundException("Node UUID: " + nodeId +
-            ", not found in state: " + currentState);
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Returns DatanodeDetails for the given node id.
-   *
-   * @param uuid Node Id
-   *
-   * @return DatanodeDetails of the node
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public DatanodeDetails getNodeDetails(UUID uuid)
-      throws NodeNotFoundException {
-    return getNodeInfo(uuid);
-  }
-
-  /**
-   * Returns DatanodeInfo for the given node id.
-   *
-   * @param uuid Node Id
-   *
-   * @return DatanodeInfo of the node
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public DatanodeInfo getNodeInfo(UUID uuid) throws NodeNotFoundException {
-    lock.readLock().lock();
-    try {
-      if (nodeMap.containsKey(uuid)) {
-        return nodeMap.get(uuid);
-      }
-      throw new NodeNotFoundException("Node UUID: " + uuid);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-
-  /**
-   * Returns the list of node ids which are in the specified state.
-   *
-   * @param state NodeState
-   *
-   * @return list of node ids
-   */
-  public List<UUID> getNodes(NodeState state) {
-    lock.readLock().lock();
-    try {
-      return new LinkedList<>(stateMap.get(state));
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the list of all the node ids.
-   *
-   * @return list of all the node ids
-   */
-  public List<UUID> getAllNodes() {
-    lock.readLock().lock();
-    try {
-      return new LinkedList<>(nodeMap.keySet());
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the count of nodes in the specified state.
-   *
-   * @param state NodeState
-   *
-   * @return Number of nodes in the specified state
-   */
-  public int getNodeCount(NodeState state) {
-    lock.readLock().lock();
-    try {
-      return stateMap.get(state).size();
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the total node count.
-   *
-   * @return node count
-   */
-  public int getTotalNodeCount() {
-    lock.readLock().lock();
-    try {
-      return nodeMap.size();
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the current state of the node.
-   *
-   * @param uuid node id
-   *
-   * @return NodeState
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public NodeState getNodeState(UUID uuid) throws NodeNotFoundException {
-    lock.readLock().lock();
-    try {
-      for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
-        if (entry.getValue().contains(uuid)) {
-          return entry.getKey();
-        }
-      }
-      throw new NodeNotFoundException("Node UUID: " + uuid);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Removes the node from NodeStateMap.
-   *
-   * @param uuid node id
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public void removeNode(UUID uuid) throws NodeNotFoundException {
-    lock.writeLock().lock();
-    try {
-      if (nodeMap.containsKey(uuid)) {
-        for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
-          if(entry.getValue().remove(uuid)) {
-            break;
-          }
-          nodeMap.remove(uuid);
-        }
-        throw new NodeNotFoundException("Node UUID: " + uuid);
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Returns the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat of the specify node.
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
-    SCMNodeStat stat = nodeStats.get(uuid);
-    if (stat == null) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
-    }
-    return stat;
-  }
-
-  /**
-   * Returns a unmodifiable copy of nodeStats.
-   *
-   * @return map with node stats.
-   */
-  public Map<UUID, SCMNodeStat> getNodeStats() {
-    return Collections.unmodifiableMap(nodeStats);
-  }
-
-  /**
-   * Set the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @param newstat stat that will set to the specify node.
-   */
-  public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
-    nodeStats.put(uuid, newstat);
-  }
-
-  /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    SCMNodeStat stat = nodeStats.remove(uuid);
-    if (stat == null) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
-    }
-    return stat;
-  }
-
-  /**
-   * Since we don't hold a global lock while constructing this string,
-   * the result might be inconsistent. If someone has changed the state of node
-   * while we are constructing the string, the result will be inconsistent.
-   * This should only be used for logging. We should not parse this string and
-   * use it for any critical calculations.
-   *
-   * @return current state of NodeStateMap
-   */
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append("Total number of nodes: ").append(getTotalNodeCount());
-    for (NodeState state : NodeState.values()) {
-      builder.append("Number of nodes in ").append(state).append(" state: ")
-          .append(getNodeCount(state));
-    }
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
deleted file mode 100644
index 0c7610f..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdds.scm.node.states;
-
-import java.util.Collections;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A Container/Pipeline Report gets processed by the
- * Node2Container/Node2Pipeline and returns Report Result class.
- */
-public final class ReportResult<T> {
-  private ReportStatus status;
-  private Set<T> missingEntries;
-  private Set<T> newEntries;
-
-  private ReportResult(ReportStatus status,
-      Set<T> missingEntries,
-      Set<T> newEntries) {
-    this.status = status;
-    Preconditions.checkNotNull(missingEntries);
-    Preconditions.checkNotNull(newEntries);
-    this.missingEntries = missingEntries;
-    this.newEntries = newEntries;
-  }
-
-  public ReportStatus getStatus() {
-    return status;
-  }
-
-  public Set<T> getMissingEntries() {
-    return missingEntries;
-  }
-
-  public Set<T> getNewEntries() {
-    return newEntries;
-  }
-
-  /**
-   * Result after processing report for node2Object map.
-   * @param <T>
-   */
-  public static class ReportResultBuilder<T> {
-    private ReportStatus status;
-    private Set<T> missingEntries;
-    private Set<T> newEntries;
-
-    public ReportResultBuilder<T> setStatus(
-        ReportStatus newStatus) {
-      this.status = newStatus;
-      return this;
-    }
-
-    public ReportResultBuilder<T> setMissingEntries(
-        Set<T> missingEntriesList) {
-      this.missingEntries = missingEntriesList;
-      return this;
-    }
-
-    public ReportResultBuilder<T> setNewEntries(
-        Set<T> newEntriesList) {
-      this.newEntries = newEntriesList;
-      return this;
-    }
-
-    public ReportResult<T> build() {
-
-      Set<T> nullSafeMissingEntries = this.missingEntries;
-      Set<T> nullSafeNewEntries = this.newEntries;
-      if (nullSafeNewEntries == null) {
-        nullSafeNewEntries = Collections.emptySet();
-      }
-      if (nullSafeMissingEntries == null) {
-        nullSafeMissingEntries = Collections.emptySet();
-      }
-      return new ReportResult<T>(status, nullSafeMissingEntries,
-              nullSafeNewEntries);
-    }
-  }
-
-  /**
-   * Results possible from processing a report.
-   */
-  public enum ReportStatus {
-    ALL_IS_WELL,
-    MISSING_ENTRIES,
-    NEW_ENTRIES_FOUND,
-    MISSING_AND_NEW_ENTRIES_FOUND,
-    NEW_DATANODE_FOUND,
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/package-info.java
deleted file mode 100644
index c429c5c..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- *
- */
-
-/**
- * Node States package.
- */
-package org.apache.hadoop.hdds.scm.node.states;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
deleted file mode 100644
index 4669e74..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-/*
- * This package contains StorageContainerManager classes.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
deleted file mode 100644
index 1053149..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .PipelineActionsFromDatanode;
-
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles pipeline actions from datanode.
- */
-public class PipelineActionEventHandler implements
-    EventHandler<PipelineActionsFromDatanode> {
-
-  public static final Logger LOG = LoggerFactory.getLogger(
-      PipelineActionEventHandler.class);
-
-  public PipelineActionEventHandler() {
-
-  }
-
-  @Override
-  public void onMessage(PipelineActionsFromDatanode report,
-      EventPublisher publisher) {
-    for (PipelineAction action : report.getReport().getPipelineActionsList()) {
-      switch (action.getAction()) {
-      case CLOSE:
-        PipelineID pipelineID = PipelineID.
-            getFromProtobuf(action.getClosePipeline().getPipelineID());
-        LOG.info("Closing pipeline " + pipelineID + " for reason:" + action
-            .getClosePipeline().getDetailedReason());
-        publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID);
-        break;
-      default:
-        LOG.error("unknown pipeline action:{}" + action.getAction());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
deleted file mode 100644
index e49678f..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles pipeline close event.
- */
-public class PipelineCloseHandler implements EventHandler<PipelineID> {
-  private static final Logger LOG = LoggerFactory
-          .getLogger(PipelineCloseHandler.class);
-
-  private final PipelineSelector pipelineSelector;
-  public PipelineCloseHandler(PipelineSelector pipelineSelector) {
-    this.pipelineSelector = pipelineSelector;
-  }
-
-  @Override
-  public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
-    Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
-    try {
-      if (pipeline != null) {
-        pipelineSelector.finalizePipeline(pipeline);
-      } else {
-        LOG.debug("pipeline:{} not found", pipelineID);
-      }
-    } catch (Exception e) {
-      LOG.info("failed to close pipeline:{}", pipelineID, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
deleted file mode 100644
index ca2e878..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Manage Ozone pipelines.
- */
-public abstract class PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineManager.class);
-  protected final ArrayList<ActivePipelines> activePipelines;
-
-  public PipelineManager() {
-    activePipelines = new ArrayList<>();
-    for (ReplicationFactor factor : ReplicationFactor.values()) {
-      activePipelines.add(factor.ordinal(), new ActivePipelines());
-    }
-  }
-
-  /**
-   * List of active pipelines.
-   */
-  public static class ActivePipelines {
-    private final List<PipelineID> activePipelines;
-    private final AtomicInteger pipelineIndex;
-
-    ActivePipelines() {
-      activePipelines = new LinkedList<>();
-      pipelineIndex = new AtomicInteger(0);
-    }
-
-    void addPipeline(PipelineID pipelineID) {
-      if (!activePipelines.contains(pipelineID)) {
-        activePipelines.add(pipelineID);
-      }
-    }
-
-    public void removePipeline(PipelineID pipelineID) {
-      activePipelines.remove(pipelineID);
-    }
-
-    /**
-     * Find a Pipeline that is operational.
-     *
-     * @return - Pipeline or null
-     */
-    PipelineID findOpenPipeline() {
-      if (activePipelines.size() == 0) {
-        LOG.error("No Operational pipelines found. Returning null.");
-        return null;
-      }
-      return activePipelines.get(getNextIndex());
-    }
-
-    /**
-     * gets the next index of the Pipeline to get.
-     *
-     * @return index in the link list to get.
-     */
-    private int getNextIndex() {
-      return pipelineIndex.incrementAndGet() % activePipelines.size();
-    }
-  }
-
-  /**
-   * This function is called by the Container Manager while allocating a new
-   * container. The client specifies what kind of replication pipeline is
-   * needed and based on the replication type in the request appropriate
-   * Interface is invoked.
-   *
-   * @param replicationFactor - Replication Factor
-   * @return a Pipeline.
-   */
-  public synchronized final PipelineID getPipeline(
-      ReplicationFactor replicationFactor, ReplicationType replicationType) {
-    PipelineID id =
-        activePipelines.get(replicationFactor.ordinal()).findOpenPipeline();
-    if (id != null) {
-      LOG.debug("re-used pipeline:{} for container with " +
-              "replicationType:{} replicationFactor:{}",
-          id, replicationType, replicationFactor);
-    }
-    if (id == null) {
-      LOG.error("Get pipeline call failed. We are not able to find" +
-              " operational pipeline.");
-      return null;
-    } else {
-      return id;
-    }
-  }
-
-  void addOpenPipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .addPipeline(pipeline.getId());
-  }
-
-  public abstract Pipeline allocatePipeline(
-      ReplicationFactor replicationFactor);
-
-  /**
-   * Initialize the pipeline.
-   * TODO: move the initialization to Ozone Client later
-   */
-  public abstract void initializePipeline(Pipeline pipeline) throws 
IOException;
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    if (pipeline.addMember(dn)
-        &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber())
-        && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) {
-      addOpenPipeline(pipeline);
-    }
-  }
-
-  /**
-   * Creates a pipeline with a specified replication factor and type.
-   * @param replicationFactor - Replication Factor.
-   * @param replicationType - Replication Type.
-   */
-  public Pipeline createPipeline(ReplicationFactor replicationFactor,
-      ReplicationType replicationType) throws IOException {
-    Pipeline pipeline = allocatePipeline(replicationFactor);
-    if (pipeline != null) {
-      LOG.debug("created new pipeline:{} for container with "
-              + "replicationType:{} replicationFactor:{}",
-          pipeline.getId(), replicationType, replicationFactor);
-    }
-    return pipeline;
-  }
-
-  /**
-   * Remove the pipeline from active allocation.
-   * @param pipeline pipeline to be finalized
-   */
-  public abstract boolean finalizePipeline(Pipeline pipeline);
-
-  /**
-   *
-   * @param pipeline
-   */
-  public abstract void closePipeline(Pipeline pipeline) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
deleted file mode 100644
index 933792b..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.server
-        .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles Node Reports from datanode.
- */
-public class PipelineReportHandler implements
-        EventHandler<PipelineReportFromDatanode> {
-
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PipelineReportHandler.class);
-  private final PipelineSelector pipelineSelector;
-
-  public PipelineReportHandler(PipelineSelector pipelineSelector) {
-    Preconditions.checkNotNull(pipelineSelector);
-    this.pipelineSelector = pipelineSelector;
-  }
-
-  @Override
-  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
-      EventPublisher publisher) {
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
-    PipelineReportsProto pipelineReport =
-            pipelineReportFromDatanode.getReport();
-    Preconditions.checkNotNull(dn, "Pipeline Report is "
-        + "missing DatanodeDetails.");
-    LOGGER.trace("Processing pipeline report for dn: {}", dn);
-    pipelineSelector.processPipelineReport(dn, pipelineReport);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
deleted file mode 100644
index c8d22ff..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .SCMContainerPlacementRandom;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
-import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_PIPELINE_STATE;
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_ACTIVE_PIPELINE;
-import static org.apache.hadoop.hdds.server
-        .ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone
-        .OzoneConsts.SCM_PIPELINE_DB;
-
-/**
- * Sends the request to the right pipeline manager.
- */
-public class PipelineSelector {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineSelector.class);
-  private final ContainerPlacementPolicy placementPolicy;
-  private final Map<ReplicationType, PipelineManager> pipelineManagerMap;
-  private final Configuration conf;
-  private final EventPublisher eventPublisher;
-  private final long containerSize;
-  private final MetadataStore pipelineStore;
-  private final PipelineStateManager stateManager;
-  private final NodeManager nodeManager;
-  private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
-  private final Map<PipelineID, Pipeline> pipelineMap;
-  private final LeaseManager<Pipeline> pipelineLeaseManager;
-
-  /**
-   * Constructs a pipeline Selector.
-   *
-   * @param nodeManager - node manager
-   * @param conf - Ozone Config
-   */
-  public PipelineSelector(NodeManager nodeManager, Configuration conf,
-      EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
-    this.conf = conf;
-    this.eventPublisher = eventPublisher;
-    this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
-    this.containerSize = (long)this.conf.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-    pipelineMap = new ConcurrentHashMap<>();
-    pipelineManagerMap = new HashMap<>();
-
-    pipelineManagerMap.put(ReplicationType.STAND_ALONE,
-            new StandaloneManagerImpl(nodeManager, placementPolicy,
-            containerSize));
-    pipelineManagerMap.put(ReplicationType.RATIS,
-            new RatisManagerImpl(nodeManager, placementPolicy,
-                    containerSize, conf));
-    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
-        pipelineCreationLeaseTimeout);
-    pipelineLeaseManager.start();
-
-    stateManager = new PipelineStateManager();
-    this.nodeManager = nodeManager;
-    pipeline2ContainerMap = new HashMap<>();
-
-    // Write the container name to pipeline mapping.
-    File metaDir = getOzoneMetaDirPath(conf);
-    File containerDBPath = new File(metaDir, SCM_PIPELINE_DB);
-    pipelineStore = MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(cacheSizeMB * OzoneConsts.MB)
-            .build();
-
-    reloadExistingPipelines();
-  }
-
-  private void reloadExistingPipelines() throws IOException {
-    if (pipelineStore.isEmpty()) {
-      // Nothing to do just return
-      return;
-    }
-
-    List<Map.Entry<byte[], byte[]>> range =
-            pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
-
-    // Transform the values into the pipelines.
-    // TODO: filter by pipeline state
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      Pipeline pipeline = Pipeline.getFromProtoBuf(
-                HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
-      Preconditions.checkNotNull(pipeline);
-      addExistingPipeline(pipeline);
-    }
-  }
-
-  @VisibleForTesting
-  public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) 
{
-    return pipeline2ContainerMap.get(pipelineID);
-  }
-
-  public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
-    pipeline2ContainerMap.get(pipelineID)
-            .add(ContainerID.valueof(containerID));
-  }
-
-  public void removeContainerFromPipeline(PipelineID pipelineID,
-                                          long containerID) throws IOException 
{
-    pipeline2ContainerMap.get(pipelineID)
-            .remove(ContainerID.valueof(containerID));
-    closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
-  }
-
-  /**
-   * Translates a list of nodes, ordered such that the first is the leader, 
into
-   * a corresponding {@link Pipeline} object.
-   *
-   * @param nodes - list of datanodes on which we will allocate the container.
-   * The first of the list will be the leader node.
-   * @return pipeline corresponding to nodes
-   */
-  public static Pipeline newPipelineFromNodes(
-      List<DatanodeDetails> nodes, ReplicationType replicationType,
-      ReplicationFactor replicationFactor, PipelineID id) {
-    Preconditions.checkNotNull(nodes);
-    Preconditions.checkArgument(nodes.size() > 0);
-    String leaderId = nodes.get(0).getUuidString();
-    // A new pipeline always starts in allocated state
-    Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
-        replicationType, replicationFactor, id);
-    for (DatanodeDetails node : nodes) {
-      pipeline.addMember(node);
-    }
-    return pipeline;
-  }
-
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  private static ContainerPlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends ContainerPlacementPolicy> implClass =
-        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
-
-    try {
-      Constructor<? extends ContainerPlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-      LOG.error("Unhandled exception occurred, Placement policy will not be " +
-          "functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "ContainerPlacementPolicy", e);
-    }
-  }
-
-  /**
-   * This function is called by the Container Manager while allocating a new
-   * container. The client specifies what kind of replication pipeline is 
needed
-   * and based on the replication type in the request appropriate Interface is
-   * invoked.
-   */
-
-  public Pipeline getReplicationPipeline(ReplicationType replicationType,
-      HddsProtos.ReplicationFactor replicationFactor)
-      throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Getting replication pipeline forReplicationType {} :" +
-            " ReplicationFactor {}", replicationType.toString(),
-        replicationFactor.toString());
-
-    /**
-     * In the Ozone world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return already allocated pipeline
-     * in a round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline =
-        manager.createPipeline(replicationFactor, replicationType);
-    if (pipeline == null) {
-      // try to return a pipeline from already allocated pipelines
-      PipelineID pipelineId =
-              manager.getPipeline(replicationFactor, replicationType);
-      if (pipelineId == null) {
-        throw new SCMException(FAILED_TO_FIND_ACTIVE_PIPELINE);
-      }
-      pipeline = pipelineMap.get(pipelineId);
-      Preconditions.checkArgument(pipeline.getLifeCycleState() ==
-              LifeCycleState.OPEN);
-    } else {
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-              pipeline.getProtobufMessage().toByteArray());
-      // if a new pipeline is created, initialize its state machine
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
-
-      //TODO: move the initialization of pipeline to Ozone Client
-      manager.initializePipeline(pipeline);
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
-    }
-    return pipeline;
-  }
-
-  /**
-   * This function to return pipeline for given pipeline id.
-   */
-  public Pipeline getPipeline(PipelineID pipelineID) {
-    return pipelineMap.get(pipelineID);
-  }
-
-  /**
-   * Finalize a given pipeline.
-   */
-  public void finalizePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
-        pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
-      LOG.debug("pipeline:{} already in closing state, skipping",
-          pipeline.getId());
-      // already in closing/closed state
-      return;
-    }
-
-    // Remove the pipeline from active allocation
-    if (manager.finalizePipeline(pipeline)) {
-      LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
-      closePipelineIfNoOpenContainers(pipeline);
-    }
-  }
-
-  /**
-   * Close a given pipeline.
-   */
-  private void closePipelineIfNoOpenContainers(Pipeline pipeline)
-      throws IOException {
-    if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
-      return;
-    }
-    HashSet<ContainerID> containerIDS =
-            pipeline2ContainerMap.get(pipeline.getId());
-    if (containerIDS.size() == 0) {
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
-      LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
-    }
-  }
-
-  /**
-   * Close a given pipeline.
-   */
-  private void closePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
-    HashSet<ContainerID> containers =
-            pipeline2ContainerMap.get(pipeline.getId());
-    Preconditions.checkArgument(containers.size() == 0);
-    manager.closePipeline(pipeline);
-  }
-
-  /**
-   * Add to a given pipeline.
-   */
-  private void addOpenPipeline(Pipeline pipeline) {
-    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
-    manager.addOpenPipeline(pipeline);
-  }
-
-  private void closeContainersByPipeline(Pipeline pipeline) {
-    HashSet<ContainerID> containers =
-            pipeline2ContainerMap.get(pipeline.getId());
-    for (ContainerID id : containers) {
-      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
-    }
-  }
-
-  private void addExistingPipeline(Pipeline pipeline) throws IOException {
-    LifeCycleState state = pipeline.getLifeCycleState();
-    switch (state) {
-    case ALLOCATED:
-      // a pipeline in allocated state is only present in SCM and does not 
exist
-      // on datanode, on SCM restart, this pipeline can be ignored.
-      break;
-    case CREATING:
-    case OPEN:
-    case CLOSING:
-      //TODO: process pipeline report and move pipeline to active queue
-      // when all the nodes have reported.
-      pipelineMap.put(pipeline.getId(), pipeline);
-      pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-      nodeManager.addPipeline(pipeline);
-      // reset the datanodes in the pipeline
-      // they will be reset on
-      pipeline.resetPipeline();
-      break;
-    case CLOSED:
-      // if the pipeline is in closed state, nothing to do.
-      break;
-    default:
-      throw new IOException("invalid pipeline state:" + state);
-    }
-  }
-
-  public void handleStaleNode(DatanodeDetails dn) {
-    Set<PipelineID> pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid());
-    for (PipelineID id : pipelineIDs) {
-      LOG.info("closing pipeline {}.", id);
-      eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
-    }
-  }
-
-  void processPipelineReport(DatanodeDetails dn,
-                                    PipelineReportsProto pipelineReport) {
-    Set<PipelineID> reportedPipelines = new HashSet<>();
-    pipelineReport.getPipelineReportList().
-            forEach(p ->
-                    reportedPipelines.add(
-                            processPipelineReport(p.getPipelineID(), dn)));
-
-    //TODO: handle missing pipelines and new pipelines later
-  }
-
-  private PipelineID processPipelineReport(
-          HddsProtos.PipelineID id, DatanodeDetails dn) {
-    PipelineID pipelineID = PipelineID.getFromProtobuf(id);
-    Pipeline pipeline = pipelineMap.get(pipelineID);
-    if (pipeline != null) {
-      pipelineManagerMap.get(pipeline.getType())
-              .processPipelineReport(pipeline, dn);
-    }
-    return pipelineID;
-  }
-
-  /**
-   * Update the Pipeline State to the next state.
-   *
-   * @param pipeline - Pipeline
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
-   */
-  public void updatePipelineState(Pipeline pipeline,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    try {
-      switch (event) {
-      case CREATE:
-        pipelineMap.put(pipeline.getId(), pipeline);
-        pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
-        nodeManager.addPipeline(pipeline);
-        // Acquire lease on pipeline
-        Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
-        // Register callback to be executed in case of timeout
-        pipelineLease.registerCallBack(() -> {
-          updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on pipeline
-        pipelineLeaseManager.release(pipeline);
-        addOpenPipeline(pipeline);
-        break;
-
-      case FINALIZE:
-        closeContainersByPipeline(pipeline);
-        break;
-
-      case CLOSE:
-      case TIMEOUT:
-        closePipeline(pipeline);
-        pipeline2ContainerMap.remove(pipeline.getId());
-        nodeManager.removePipeline(pipeline);
-        pipelineMap.remove(pipeline.getId());
-        break;
-      default:
-        throw new SCMException("Unsupported pipeline LifeCycleEvent.",
-            FAILED_TO_CHANGE_PIPELINE_STATE);
-      }
-
-      stateManager.updatePipelineState(pipeline, event);
-      pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
-              pipeline.getProtobufMessage().toByteArray());
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    }
-  }
-
-  public void shutdown() throws IOException {
-    if (pipelineLeaseManager != null) {
-      pipelineLeaseManager.shutdown();
-    }
-
-    if (pipelineStore != null) {
-      pipelineStore.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
deleted file mode 100644
index 6054f16..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_PIPELINE_STATE;
-
-/**
- * Manages Pipeline states.
- */
-public class PipelineStateManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineStateManager.class);
-
-  private final StateMachine<HddsProtos.LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
-
-  PipelineStateManager() {
-    // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet<>();
-    // These are the steady states of a container.
-    finalStates.add(HddsProtos.LifeCycleState.OPEN);
-    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
-
-    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-  }
-
-  /**
-   * Event and State Transition Mapping.
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CREATING  ---------------> CLOSED
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
-   *            (CREATE)     | (CREATED)     (FINALIZE)   |
-   *                         |                            |
-   *                         |                            |
-   *                         |(TIMEOUT)                   |(CLOSE)
-   *                         |                            |
-   *                         +--------> [CLOSED] <--------+
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
-        HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.TIMEOUT);
-  }
-
-
-  /**
-   * Update the Pipeline State to the next state.
-   *
-   * @param pipeline - Pipeline
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
-   */
-  public void updatePipelineState(Pipeline pipeline,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    HddsProtos.LifeCycleState newState;
-    try {
-      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), 
event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update pipeline state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          pipeline.getId(), pipeline.getLifeCycleState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    Preconditions.checkNotNull(pipeline);
-    pipeline.setLifeCycleState(newState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
deleted file mode 100644
index ea24c58..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines;
-/**
- Ozone supports the notion of different kind of pipelines.
- That means that we can have a replication pipeline build on
- Ratis, Standalone or some other protocol. All Pipeline managers
- the entities in charge of pipelines reside in the package.
-
- Here is the high level Arch.
-
- 1. A pipeline selector class is instantiated in the Container manager class.
-
- 2. A client when creating a container -- will specify what kind of
- replication type it wants to use. We support 2 types now, Ratis and 
StandAlone.
-
- 3. Based on the replication type, the pipeline selector class asks the
- corresponding pipeline manager for a pipeline.
-
- 4. We have supported the ability for clients to specify a set of nodes in
- the pipeline or rely in the pipeline manager to select the datanodes if they
- are not specified.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
deleted file mode 100644
index 905a5b5..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.ratis;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Implementation of {@link PipelineManager}.
- *
- * TODO : Introduce a state machine.
- */
-public class RatisManagerImpl extends PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisManagerImpl.class);
-  private final Configuration conf;
-  private final NodeManager nodeManager;
-  private final Set<DatanodeDetails> ratisMembers;
-
-  /**
-   * Constructs a Ratis Pipeline Manager.
-   *
-   * @param nodeManager
-   */
-  public RatisManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long size, Configuration conf) 
{
-    super();
-    this.conf = conf;
-    this.nodeManager = nodeManager;
-    ratisMembers = new HashSet<>();
-  }
-
-  /**
-   * Allocates a new ratis Pipeline from the free nodes.
-   *
-   * @param factor - One or Three
-   * @return Pipeline.
-   */
-  public Pipeline allocatePipeline(ReplicationFactor factor) {
-    List<DatanodeDetails> newNodesList = new LinkedList<>();
-    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
-    // data from datanode instead of maintaining a set.
-    for (DatanodeDetails datanode : datanodes) {
-      Preconditions.checkNotNull(datanode);
-      if (!ratisMembers.contains(datanode)) {
-        newNodesList.add(datanode);
-        if (newNodesList.size() == factor.getNumber()) {
-          // once a datanode has been added to a pipeline, exclude it from
-          // further allocations
-          ratisMembers.addAll(newNodesList);
-          PipelineID pipelineID = PipelineID.randomId();
-          LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
-                  factor.getNumber(), pipelineID);
-          return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.RATIS, factor, pipelineID);
-        }
-      }
-    }
-    return null;
-  }
-
-  public void initializePipeline(Pipeline pipeline) throws IOException {
-    //TODO:move the initialization from SCM to client
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline();
-    }
-  }
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    super.processPipelineReport(pipeline, dn);
-    ratisMembers.add(dn);
-  }
-
-  public synchronized boolean finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-    return true;
-  }
-
-  /**
-   * Close the pipeline.
-   */
-  public void closePipeline(Pipeline pipeline) throws IOException {
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.destroyPipeline();
-    }
-    for (DatanodeDetails node : pipeline.getMachines()) {
-      // A node should always be the in ratis members list.
-      Preconditions.checkArgument(ratisMembers.remove(node));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
deleted file mode 100644
index 2970fb3..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.ratis;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
deleted file mode 100644
index 045afb6..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.standalone;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Standalone Manager Impl to prove that pluggable interface
- * works with current tests.
- */
-public class StandaloneManagerImpl extends PipelineManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StandaloneManagerImpl.class);
-  private final NodeManager nodeManager;
-  private final ContainerPlacementPolicy placementPolicy;
-  private final long containerSize;
-  private final Set<DatanodeDetails> standAloneMembers;
-
-  /**
-   * Constructor for Standalone Node Manager Impl.
-   * @param nodeManager - Node Manager.
-   * @param placementPolicy - Placement Policy
-   * @param containerSize - Container Size.
-   */
-  public StandaloneManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long containerSize) {
-    super();
-    this.nodeManager = nodeManager;
-    this.placementPolicy = placementPolicy;
-    this.containerSize =  containerSize;
-    this.standAloneMembers = new HashSet<>();
-  }
-
-
-  /**
-   * Allocates a new standalone Pipeline from the free nodes.
-   *
-   * @param factor - One
-   * @return Pipeline.
-   */
-  public Pipeline allocatePipeline(ReplicationFactor factor) {
-    List<DatanodeDetails> newNodesList = new LinkedList<>();
-    List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    for (DatanodeDetails datanode : datanodes) {
-      Preconditions.checkNotNull(datanode);
-      if (!standAloneMembers.contains(datanode)) {
-        newNodesList.add(datanode);
-        if (newNodesList.size() == factor.getNumber()) {
-          // once a datanode has been added to a pipeline, exclude it from
-          // further allocations
-          standAloneMembers.addAll(newNodesList);
-          // Standalone pipeline use node id as pipeline
-          PipelineID pipelineID =
-                  PipelineID.valueOf(newNodesList.get(0).getUuid());
-          LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
-              factor.getNumber(), pipelineID);
-          return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
-        }
-      }
-    }
-    return null;
-  }
-
-  public void initializePipeline(Pipeline pipeline) {
-    // Nothing to be done for standalone pipeline
-  }
-
-  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
-    super.processPipelineReport(pipeline, dn);
-    standAloneMembers.add(dn);
-  }
-
-  public synchronized boolean finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-    return false;
-  }
-
-  /**
-   * Close the pipeline.
-   */
-  public void closePipeline(Pipeline pipeline) throws IOException {
-    for (DatanodeDetails node : pipeline.getMachines()) {
-      // A node should always be the in standalone members list.
-      Preconditions.checkArgument(standAloneMembers.remove(node));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
deleted file mode 100644
index b2c3ca40..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipelines.standalone;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
deleted file mode 100644
index 4944017..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.ratis;
-
-/**
- * This package contains classes related to Apache Ratis for SCM.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ChillModePrecheck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ChillModePrecheck.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ChillModePrecheck.java
deleted file mode 100644
index b92413e..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ChillModePrecheck.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.server;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import 
org.apache.hadoop.hdds.scm.server.SCMChillModeManager.ChillModeRestrictedOps;
-
-/**
- * Chill mode pre-check for SCM operations.
- * */
-public class ChillModePrecheck implements Precheck<ScmOps> {
-
-  private AtomicBoolean inChillMode = new AtomicBoolean(true);
-  public static final String PRECHECK_TYPE = "ChillModePrecheck";
-
-  public boolean check(ScmOps op) throws SCMException {
-    if (inChillMode.get() && ChillModeRestrictedOps
-        .isRestrictedInChillMode(op)) {
-      throw new SCMException("ChillModePrecheck failed for " + op,
-          ResultCodes.CHILL_MODE_EXCEPTION);
-    }
-    return inChillMode.get();
-  }
-
-  @Override
-  public String type() {
-    return PRECHECK_TYPE;
-  }
-
-  public boolean isInChillMode() {
-    return inChillMode.get();
-  }
-
-  public void setInChillMode(boolean inChillMode) {
-    this.inChillMode.set(inChillMode);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/Precheck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/Precheck.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/Precheck.java
deleted file mode 100644
index 1654990..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/Precheck.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.server;
-
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-
-/**
- * Precheck for SCM operations.
- * */
-public interface Precheck<T> {
-  boolean check(T t) throws SCMException;
-  String type();
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to