Repository: hadoop
Updated Branches:
  refs/heads/trunk f89e26590 -> 3f3f72221


HDDS-238. Add Node2Pipeline Map in SCM to track ratis/standalone pipelines. 
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f3f7222
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f3f7222
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f3f7222

Branch: refs/heads/trunk
Commit: 3f3f72221ffd11cc6bfa0e010e3c5b0e14911102
Parents: f89e265
Author: Xiaoyu Yao <x...@apache.org>
Authored: Thu Jul 12 22:02:57 2018 -0700
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Thu Jul 12 22:14:03 2018 -0700

----------------------------------------------------------------------
 .../container/common/helpers/ContainerInfo.java |  11 ++
 .../hdds/scm/container/ContainerMapping.java    |  11 +-
 .../scm/container/ContainerStateManager.java    |   6 +
 .../scm/container/states/ContainerStateMap.java |  36 +++++-
 .../hdds/scm/pipelines/Node2PipelineMap.java    | 121 +++++++++++++++++++
 .../hdds/scm/pipelines/PipelineManager.java     |  22 ++--
 .../hdds/scm/pipelines/PipelineSelector.java    |  24 +++-
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  11 +-
 .../standalone/StandaloneManagerImpl.java       |   7 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java | 117 ++++++++++++++++++
 10 files changed, 343 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index 9593717..4074b21 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -456,4 +456,15 @@ public class ContainerInfo implements 
Comparator<ContainerInfo>,
           replicationFactor, replicationType);
     }
   }
+
+  /**
+   * Check if a container is in open state, this will check if the
+   * container is either open or allocated or creating. Any containers in
+   * these states is managed as an open container by SCM.
+   */
+  public boolean isContainerOpen() {
+    return state == HddsProtos.LifeCycleState.ALLOCATED ||
+        state == HddsProtos.LifeCycleState.CREATING ||
+        state == HddsProtos.LifeCycleState.OPEN;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index abad32c..26f4d86 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -477,7 +477,7 @@ public class ContainerMapping implements Mapping {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
 
-     for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
       byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
       lock.lock();
@@ -498,7 +498,9 @@ public class ContainerMapping implements Mapping {
           containerStore.put(dbKey, newState.toByteArray());
 
           // If the container is closed, then state is already written to SCM
-          Pipeline pipeline = 
pipelineSelector.getPipeline(newState.getPipelineName(), 
newState.getReplicationType());
+          Pipeline pipeline =
+              pipelineSelector.getPipeline(newState.getPipelineName(),
+                  newState.getReplicationType());
           if(pipeline == null) {
             pipeline = pipelineSelector
                 .getReplicationPipeline(newState.getReplicationType(),
@@ -713,4 +715,9 @@ public class ContainerMapping implements Mapping {
   public MetadataStore getContainerStore() {
     return containerStore;
   }
+
+  @VisibleForTesting
+  public PipelineSelector getPipelineSelector() {
+    return pipelineSelector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 223deac..b2431dc 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -522,4 +523,9 @@ public class ContainerStateManager implements Closeable {
       DatanodeDetails dn) throws SCMException {
     return containers.removeContainerReplica(containerID, dn);
   }
+  
+  @VisibleForTesting
+  public ContainerStateMap getContainerStateMap() {
+    return containers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 1c92861..46fe2ab 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -51,7 +51,7 @@ import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
  * Container State Map acts like a unified map for various attributes that are
  * used to select containers when we need allocated blocks.
  * <p>
- * This class provides the ability to query 4 classes of attributes. They are
+ * This class provides the ability to query 5 classes of attributes. They are
  * <p>
  * 1. LifeCycleStates - LifeCycle States of container describe in which state
  * a container is. For example, a container needs to be in Open State for a
@@ -72,6 +72,9 @@ import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
  * Replica and THREE Replica. User can specify how many copies should be made
  * for a ozone key.
  * <p>
+ * 5.Pipeline - The pipeline constitute the set of Datanodes on which the
+ * open container resides physically.
+ * <p>
  * The most common access pattern of this class is to select a container based
  * on all these parameters, for example, when allocating a block we will
  * select a container that belongs to user1, with Ratis replication which can
@@ -86,6 +89,14 @@ public class ContainerStateMap {
   private final ContainerAttribute<String> ownerMap;
   private final ContainerAttribute<ReplicationFactor> factorMap;
   private final ContainerAttribute<ReplicationType> typeMap;
+  // This map constitutes the pipeline to open container mappings.
+  // This map will be queried for the list of open containers on a particular
+  // pipeline and issue a close on corresponding containers in case of
+  // following events:
+  //1. Dead datanode.
+  //2. Datanode out of space.
+  //3. Volume loss or volume out of space.
+  private final ContainerAttribute<String> openPipelineMap;
 
   private final Map<ContainerID, ContainerInfo> containerMap;
   // Map to hold replicas of given container.
@@ -106,6 +117,7 @@ public class ContainerStateMap {
     ownerMap = new ContainerAttribute<>();
     factorMap = new ContainerAttribute<>();
     typeMap = new ContainerAttribute<>();
+    openPipelineMap = new ContainerAttribute<>();
     containerMap = new HashMap<>();
     autoLock = new AutoCloseableLock();
     contReplicaMap = new HashMap<>();
@@ -140,6 +152,9 @@ public class ContainerStateMap {
       ownerMap.insert(info.getOwner(), id);
       factorMap.insert(info.getReplicationFactor(), id);
       typeMap.insert(info.getReplicationType(), id);
+      if (info.isContainerOpen()) {
+        openPipelineMap.insert(info.getPipelineName(), id);
+      }
       LOG.trace("Created container with {} successfully.", id);
     }
   }
@@ -329,6 +344,11 @@ public class ContainerStateMap {
       throw new SCMException("Updating the container map failed.", ex,
           FAILED_TO_CHANGE_CONTAINER_STATE);
     }
+    // In case the container is set to closed state, it needs to be removed 
from
+    // the pipeline Map.
+    if (newState == LifeCycleState.CLOSED) {
+      openPipelineMap.remove(info.getPipelineName(), id);
+    }
   }
 
   /**
@@ -360,6 +380,20 @@ public class ContainerStateMap {
   }
 
   /**
+   * Returns Open containers in the SCM by the Pipeline
+   *
+   * @param pipeline - Pipeline name.
+   * @return NavigableSet<ContainerID>
+   */
+  public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String 
pipeline) {
+    Preconditions.checkNotNull(pipeline);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return openPipelineMap.getCollection(pipeline);
+    }
+  }
+
+  /**
    * Returns Containers by replication factor.
    *
    * @param factor - Replication Factor.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
new file mode 100644
index 0000000..2e89616
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .DUPLICATE_DATANODE;
+
+
+/**
+ * This data structure maintains the list of pipelines which the given datanode
+ * is a part of.
+ * This information will be added whenever a new pipeline allocation happens.
+ *
+ * TODO: this information needs to be regenerated from pipeline reports on
+ * SCM restart
+ */
+public class Node2PipelineMap {
+  private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
+
+  /**
+   * Constructs a Node2PipelineMap Object.
+   */
+  public Node2PipelineMap() {
+    dn2PipelineMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns true if this a datanode that is already tracked by
+   * Node2PipelineMap.
+   *
+   * @param datanodeID - UUID of the Datanode.
+   * @return True if this is tracked, false if this map does not know about it.
+   */
+  private boolean isKnownDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    return dn2PipelineMap.containsKey(datanodeID);
+  }
+
+  /**
+   * Insert a new datanode into Node2Pipeline Map.
+   *
+   * @param datanodeID -- Datanode UUID
+   * @param pipelines - set of pipelines.
+   */
+  private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines)
+      throws SCMException {
+    Preconditions.checkNotNull(pipelines);
+    Preconditions.checkNotNull(datanodeID);
+    if(dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
+      throw new SCMException("Node already exists in the map",
+          DUPLICATE_DATANODE);
+    }
+  }
+
+  /**
+   * Removes datanode Entry from the map.
+   * @param datanodeID - Datanode ID.
+   */
+  public synchronized void removeDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
+  }
+
+  /**
+   * Returns null if there no pipelines associated with this datanode ID.
+   *
+   * @param datanode - UUID
+   * @return Set of pipelines or Null.
+   */
+  public Set<Pipeline> getPipelines(UUID datanode) {
+    Preconditions.checkNotNull(datanode);
+    return dn2PipelineMap.computeIfPresent(datanode, (k, v) ->
+        Collections.unmodifiableSet(v));
+  }
+
+/**
+ * Adds a pipeline entry to a given dataNode in the map.
+ * @param pipeline Pipeline to be added
+ */
+ public synchronized void addPipeline(Pipeline pipeline) throws SCMException {
+   for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+     UUID dnId = details.getUuid();
+     dn2PipelineMap
+         .computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>()))
+         .add(pipeline);
+   }
+ }
+
+  public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
+    return Collections.unmodifiableMap(dn2PipelineMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index a1fbce6..a041973 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -40,11 +40,13 @@ public abstract class PipelineManager {
   private final List<Pipeline> activePipelines;
   private final Map<String, Pipeline> activePipelineMap;
   private final AtomicInteger pipelineIndex;
+  private final Node2PipelineMap node2PipelineMap;
 
-  public PipelineManager() {
+  public PipelineManager(Node2PipelineMap map) {
     activePipelines = new LinkedList<>();
     pipelineIndex = new AtomicInteger(0);
     activePipelineMap = new WeakHashMap<>();
+    node2PipelineMap = map;
   }
 
   /**
@@ -66,24 +68,23 @@ public abstract class PipelineManager {
      *
      * 2. This allows all nodes to part of a pipeline quickly.
      *
-     * 3. if there are not enough free nodes, return conduits in a
+     * 3. if there are not enough free nodes, return pipeline in a
      * round-robin fashion.
      *
      * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns conduits in round robin
+     * Create a new placement policy that returns pipelines in round robin
      * fashion.
      */
-    Pipeline pipeline =
-        allocatePipeline(replicationFactor);
+    Pipeline pipeline = allocatePipeline(replicationFactor);
     if (pipeline != null) {
       LOG.debug("created new pipeline:{} for container with " +
               "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
       activePipelines.add(pipeline);
       activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+      node2PipelineMap.addPipeline(pipeline);
     } else {
-      pipeline =
-          findOpenPipeline(replicationType, replicationFactor);
+      pipeline = findOpenPipeline(replicationType, replicationFactor);
       if (pipeline != null) {
         LOG.debug("re-used pipeline:{} for container with " +
                 "replicationType:{} replicationFactor:{}",
@@ -133,6 +134,11 @@ public abstract class PipelineManager {
   public abstract Pipeline allocatePipeline(
       ReplicationFactor replicationFactor) throws IOException;
 
+  public void removePipeline(Pipeline pipeline) {
+    activePipelines.remove(pipeline);
+    activePipelineMap.remove(pipeline.getPipelineName());
+  }
+
   /**
    * Find a Pipeline that is operational.
    *
@@ -143,7 +149,7 @@ public abstract class PipelineManager {
     Pipeline pipeline = null;
     final int sentinal = -1;
     if (activePipelines.size() == 0) {
-      LOG.error("No Operational conduits found. Returning null.");
+      LOG.error("No Operational pipelines found. Returning null.");
       return null;
     }
     int startIndex = getNextIndex();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 3846a84..2955af5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm.pipelines;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
@@ -41,6 +40,8 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -55,7 +56,7 @@ public class PipelineSelector {
   private final RatisManagerImpl ratisManager;
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
-
+  private final Node2PipelineMap node2PipelineMap;
   /**
    * Constructs a pipeline Selector.
    *
@@ -69,12 +70,13 @@ public class PipelineSelector {
     this.containerSize = OzoneConsts.GB * this.conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+    node2PipelineMap = new Node2PipelineMap();
     this.standaloneManager =
         new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize);
+            containerSize, node2PipelineMap);
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf);
+            conf, node2PipelineMap);
   }
 
   /**
@@ -243,4 +245,18 @@ public class PipelineSelector {
             .collect(Collectors.joining(",")));
     manager.updatePipeline(pipelineID, newDatanodes);
   }
+
+  public Node2PipelineMap getNode2PipelineMap() {
+    return node2PipelineMap;
+  }
+
+  public void removePipeline(UUID dnId) {
+    Set<Pipeline> pipelineChannelSet =
+        node2PipelineMap.getPipelines(dnId);
+    for (Pipeline pipelineChannel : pipelineChannelSet) {
+      getPipelineManager(pipelineChannel.getType())
+          .removePipeline(pipelineChannel);
+    }
+    node2PipelineMap.removeDatanode(dnId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index 189060e..a8f8b20 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hdds.scm.pipelines.ratis;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -60,8 +60,9 @@ public class RatisManagerImpl extends PipelineManager {
    * @param nodeManager
    */
   public RatisManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long size, Configuration conf) 
{
-    super();
+      ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
+      Node2PipelineMap map) {
+    super(map);
     this.conf = conf;
     this.nodeManager = nodeManager;
     ratisMembers = new HashSet<>();
@@ -89,11 +90,11 @@ public class RatisManagerImpl extends PipelineManager {
           ratisMembers.addAll(newNodesList);
           LOG.info("Allocating a new ratis pipeline of size: {}", count);
           // Start all channel names with "Ratis", easy to grep the logs.
-          String conduitName = PREFIX +
+          String pipelineName = PREFIX +
               UUID.randomUUID().toString().substring(PREFIX.length());
           Pipeline pipeline=
               PipelineSelector.newPipelineFromNodes(newNodesList,
-              LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
+              LifeCycleState.OPEN, ReplicationType.RATIS, factor, 
pipelineName);
           try (XceiverClientRatis client =
               XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
             client.createPipeline(pipeline.getPipelineName(), newNodesList);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index 579a3a2..cf691bf 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -17,11 +17,11 @@
 package org.apache.hadoop.hdds.scm.pipelines.standalone;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -58,8 +58,9 @@ public class StandaloneManagerImpl extends PipelineManager {
    * @param containerSize - Container Size.
    */
   public StandaloneManagerImpl(NodeManager nodeManager,
-      ContainerPlacementPolicy placementPolicy, long containerSize) {
-    super();
+      ContainerPlacementPolicy placementPolicy, long containerSize,
+      Node2PipelineMap map) {
+    super(map);
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize =  containerSize;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
new file mode 100644
index 0000000..bc3505f
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationType.RATIS;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationFactor.THREE;
+
+public class TestNode2PipelineMap {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static StorageContainerManager scm;
+  private static ContainerWithPipeline ratisContainer;
+  private static ContainerStateMap stateMap;
+  private static ContainerMapping mapping;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+    mapping = (ContainerMapping)scm.getScmContainerManager();
+    stateMap = mapping.getStateManager().getContainerStateMap();
+    ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+
+  @Test
+  public void testPipelineMap() throws IOException {
+
+    NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+        ratisContainer.getPipeline().getPipelineName());
+
+    long cId = ratisContainer.getContainerInfo().getContainerID();
+    Assert.assertEquals(1, set.size());
+    Assert.assertEquals(cId, set.first().getId());
+
+    List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
+    Assert.assertEquals(3, dns.size());
+
+    // get pipeline details by dnid
+    Set<Pipeline> pipelines = mapping.getPipelineSelector()
+        .getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
+    Assert.assertEquals(1, pipelines.size());
+    pipelines.forEach(p -> Assert.assertEquals(p.getPipelineName(),
+        ratisContainer.getPipeline().getPipelineName()));
+
+
+    // Now close the container and it should not show up while fetching
+    // containers by pipeline
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
+    NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
+        ratisContainer.getPipeline().getPipelineName());
+    Assert.assertEquals(0, set2.size());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to