This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new a52e8ef  HDDS-4630: Solve deadlock triggered by PipelineActionHandler. 
(#1743)
a52e8ef is described below

commit a52e8efd7dcd2b3039cc6bf303cd03fdb0de6b95
Author: GlenGeng <[email protected]>
AuthorDate: Thu Jan 7 15:33:21 2021 +0800

    HDDS-4630: Solve deadlock triggered by PipelineActionHandler. (#1743)
---
 .../hdds/scm/container/ContainerManagerImpl.java   | 117 +++-----
 .../scm/container/ContainerStateManagerImpl.java   | 205 ++++++++-----
 .../scm/container/states/ContainerStateMap.java    | 330 ++++++++-------------
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   | 117 ++------
 4 files changed, 331 insertions(+), 438 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 5ce85f6..91684ce 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -28,8 +28,8 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -70,8 +70,8 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
   /**
    *
    */
-  //Can we move this lock to ContainerStateManager?
-  private final ReadWriteLock lock;
+  // Limit the number of on-going ratis operations.
+  private final Lock lock;
 
   /**
    *
@@ -102,7 +102,7 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
       final Table<ContainerID, ContainerInfo> containerStore)
       throws IOException {
     // Introduce builder for this class?
-    this.lock = new ReentrantReadWriteLock();
+    this.lock = new ReentrantLock();
     this.pipelineManager = pipelineManager;
     this.haManager = scmHaManager;
     this.containerStateManager = ContainerStateManagerImpl.newBuilder()
@@ -124,56 +124,41 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
   @Override
   public ContainerInfo getContainer(final ContainerID id)
       throws ContainerNotFoundException {
-    lock.readLock().lock();
-    try {
-      return Optional.ofNullable(containerStateManager
-          .getContainer(id.getProtobuf()))
-          .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
-    } finally {
-      lock.readLock().unlock();
-    }
+    return Optional.ofNullable(containerStateManager
+        .getContainer(id.getProtobuf()))
+        .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
   }
 
   @Override
   public List<ContainerInfo> getContainers(final ContainerID startID,
                                            final int count) {
-    lock.readLock().lock();
     scmContainerManagerMetrics.incNumListContainersOps();
-    try {
-      // TODO: Remove the null check, startID should not be null. Fix the unit
-      //  test before removing the check.
-      final long start = startID == null ? 0 : startID.getId();
-      final List<ContainerID> containersIds =
-          new ArrayList<>(containerStateManager.getContainerIDs());
-      Collections.sort(containersIds);
-      return containersIds.stream()
-          .filter(id -> id.getId() > start).limit(count)
-          .map(ContainerID::getProtobuf)
-          .map(containerStateManager::getContainer)
-          .collect(Collectors.toList());
-    } finally {
-      lock.readLock().unlock();
-    }
+    // TODO: Remove the null check, startID should not be null. Fix the unit
+    //  test before removing the check.
+    final long start = startID == null ? 0 : startID.getId();
+    final List<ContainerID> containersIds =
+        new ArrayList<>(containerStateManager.getContainerIDs());
+    Collections.sort(containersIds);
+    return containersIds.stream()
+        .filter(id -> id.getId() > start).limit(count)
+        .map(ContainerID::getProtobuf)
+        .map(containerStateManager::getContainer)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<ContainerInfo> getContainers(final LifeCycleState state) {
-    lock.readLock().lock();
-    try {
-      return containerStateManager.getContainerIDs(state).stream()
-          .map(ContainerID::getProtobuf)
-          .map(containerStateManager::getContainer)
-          .filter(Objects::nonNull).collect(Collectors.toList());
-    } finally {
-      lock.readLock().unlock();
-    }
+    return containerStateManager.getContainerIDs(state).stream()
+        .map(ContainerID::getProtobuf)
+        .map(containerStateManager::getContainer)
+        .filter(Objects::nonNull).collect(Collectors.toList());
   }
 
   @Override
   public ContainerInfo allocateContainer(final ReplicationType type,
       final ReplicationFactor replicationFactor, final String owner)
       throws IOException {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       final List<Pipeline> pipelines = pipelineManager
           .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
@@ -198,7 +183,7 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
       }
       return containerInfo;
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -233,7 +218,7 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
                                    final LifeCycleEvent event)
       throws IOException, InvalidStateTransitionException {
     final HddsProtos.ContainerID cid = id.getProtobuf();
-    lock.writeLock().lock();
+    lock.lock();
     try {
       if (containerExist(cid)) {
         containerStateManager.updateContainerState(cid, event);
@@ -241,21 +226,16 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
         throwContainerNotFoundException(cid);
       }
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public Set<ContainerReplica> getContainerReplicas(final ContainerID id)
       throws ContainerNotFoundException {
-    lock.readLock().lock();
-    try {
-      return Optional.ofNullable(containerStateManager
-          .getContainerReplicas(id.getProtobuf()))
-          .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
-    } finally {
-      lock.readLock().unlock();
-    }
+    return Optional.ofNullable(containerStateManager
+        .getContainerReplicas(id.getProtobuf()))
+        .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
   }
 
   @Override
@@ -263,15 +243,10 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
                                      final ContainerReplica replica)
       throws ContainerNotFoundException {
     final HddsProtos.ContainerID cid = id.getProtobuf();
-    lock.writeLock().lock();
-    try {
-      if (containerExist(cid)) {
-        containerStateManager.updateContainerReplica(cid, replica);
-      } else {
-        throwContainerNotFoundException(cid);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (containerExist(cid)) {
+      containerStateManager.updateContainerReplica(cid, replica);
+    } else {
+      throwContainerNotFoundException(cid);
     }
   }
 
@@ -280,27 +255,17 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
                                      final ContainerReplica replica)
       throws ContainerNotFoundException, ContainerReplicaNotFoundException {
     final HddsProtos.ContainerID cid = id.getProtobuf();
-    lock.writeLock().lock();
-    try {
-      if (containerExist(cid)) {
-        containerStateManager.removeContainerReplica(cid, replica);
-      } else {
-        throwContainerNotFoundException(cid);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (containerExist(cid)) {
+      containerStateManager.removeContainerReplica(cid, replica);
+    } else {
+      throwContainerNotFoundException(cid);
     }
   }
 
   @Override
   public void updateDeleteTransactionId(
       final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
-    lock.writeLock().lock();
-    try {
-      containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
   }
 
   @Override
@@ -384,7 +349,7 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
   public void deleteContainer(final ContainerID id)
       throws IOException {
     final HddsProtos.ContainerID cid = id.getProtobuf();
-    lock.writeLock().lock();
+    lock.lock();
     try {
       if (containerExist(cid)) {
         containerStateManager.removeContainer(cid);
@@ -394,7 +359,7 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
         throwContainerNotFoundException(cid);
       }
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 7807ae5..68e4d35 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Preconditions;
 
@@ -97,7 +99,7 @@ public final class ContainerStateManagerImpl
   /**
    * Persistent store for Container States.
    */
-  private Table<ContainerID, ContainerInfo> containerStore;
+  private final Table<ContainerID, ContainerInfo> containerStore;
 
   /**
    * PipelineManager instance.
@@ -117,6 +119,11 @@ public final class ContainerStateManagerImpl
 
   private final Map<LifeCycleEvent, CheckedConsumer<ContainerInfo, 
IOException>>
       containerStateChangeActions;
+
+  // Protect containers and containerStore against the potential
+  // contentions between RaftServer and ContainerManager.
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
   /**
    * constructs ContainerStateManagerImpl instance and loads the containers
    * form the persistent storage.
@@ -246,18 +253,32 @@ public final class ContainerStateManagerImpl
 
   @Override
   public Set<ContainerID> getContainerIDs() {
-    return containers.getAllContainerIDs();
+    lock.readLock().lock();
+    try {
+      return containers.getAllContainerIDs();
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
-    return containers.getContainerIDsByState(state);
+    lock.readLock().lock();
+    try {
+      return containers.getContainerIDsByState(state);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public ContainerInfo getContainer(final HddsProtos.ContainerID id) {
-    return containers.getContainerInfo(
-        ContainerID.getFromProtobuf(id));
+    lock.readLock().lock();
+    try {
+      return containers.getContainerInfo(ContainerID.getFromProtobuf(id));
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
@@ -273,45 +294,62 @@ public final class ContainerStateManagerImpl
     final ContainerID containerID = container.containerID();
     final PipelineID pipelineID = container.getPipelineID();
 
-    if (!containers.contains(containerID)) {
-      ExecutionUtil.create(() -> {
-        containerStore.put(containerID, container);
-        containers.addContainer(container);
-        pipelineManager.addContainerToPipeline(pipelineID, containerID);
-      }).onException(() -> {
-        containers.removeContainer(containerID);
-        containerStore.delete(containerID);
-      }).execute();
+    lock.writeLock().lock();
+    try {
+      if (!containers.contains(containerID)) {
+        ExecutionUtil.create(() -> {
+          containerStore.put(containerID, container);
+          containers.addContainer(container);
+          pipelineManager.addContainerToPipeline(pipelineID, containerID);
+        }).onException(() -> {
+          containers.removeContainer(containerID);
+          containerStore.delete(containerID);
+        }).execute();
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
   @Override
   public boolean contains(final HddsProtos.ContainerID id) {
-    // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
-    return containers.contains(ContainerID.getFromProtobuf(id));
+    lock.readLock().lock();
+    try {
+      // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
+      return containers.contains(ContainerID.getFromProtobuf(id));
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
+  @Override
   public void updateContainerState(final HddsProtos.ContainerID containerID,
                                    final LifeCycleEvent event)
       throws IOException, InvalidStateTransitionException {
     // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
     final ContainerID id = ContainerID.getFromProtobuf(containerID);
-    if (containers.contains(id)) {
-      final ContainerInfo oldInfo = containers.getContainerInfo(id);
-      final LifeCycleState oldState = oldInfo.getState();
-      final LifeCycleState newState = stateMachine.getNextState(
-          oldInfo.getState(), event);
-      if (newState.getNumber() > oldState.getNumber()) {
-        ExecutionUtil.create(() -> {
-          containers.updateState(id, oldState, newState);
-          containerStore.put(id, containers.getContainerInfo(id));
-        }).onException(() -> {
-          containerStore.put(id, oldInfo);
-          containers.updateState(id, newState, oldState);
-        }).execute();
-        containerStateChangeActions.getOrDefault(event, info -> {})
-            .execute(oldInfo);
+
+    lock.writeLock().lock();
+    try {
+      if (containers.contains(id)) {
+        final ContainerInfo oldInfo = containers.getContainerInfo(id);
+        final LifeCycleState oldState = oldInfo.getState();
+        final LifeCycleState newState = stateMachine.getNextState(
+            oldInfo.getState(), event);
+        if (newState.getNumber() > oldState.getNumber()) {
+          ExecutionUtil.create(() -> {
+            containers.updateState(id, oldState, newState);
+            containerStore.put(id, containers.getContainerInfo(id));
+          }).onException(() -> {
+            containerStore.put(id, oldInfo);
+            containers.updateState(id, newState, oldState);
+          }).execute();
+          containerStateChangeActions.getOrDefault(event, info -> {})
+              .execute(oldInfo);
+        }
       }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -319,35 +357,54 @@ public final class ContainerStateManagerImpl
   @Override
   public Set<ContainerReplica> getContainerReplicas(
       final HddsProtos.ContainerID id) {
-    return containers.getContainerReplicas(
-        ContainerID.getFromProtobuf(id));
+    lock.readLock().lock();
+    try {
+      return containers.getContainerReplicas(
+          ContainerID.getFromProtobuf(id));
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public void updateContainerReplica(final HddsProtos.ContainerID id,
                                      final ContainerReplica replica) {
-    containers.updateContainerReplica(ContainerID.getFromProtobuf(id),
-        replica);
+    lock.writeLock().lock();
+    try {
+      containers.updateContainerReplica(ContainerID.getFromProtobuf(id),
+          replica);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
   public void removeContainerReplica(final HddsProtos.ContainerID id,
                                      final ContainerReplica replica) {
-    containers.removeContainerReplica(ContainerID.getFromProtobuf(id),
-        replica);
-
+    lock.writeLock().lock();
+    try {
+      containers.removeContainerReplica(ContainerID.getFromProtobuf(id),
+          replica);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
   public void updateDeleteTransactionId(
       final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
-    // TODO: Refactor this. Error handling is not done.
-    for (Map.Entry<ContainerID, Long> transaction :
-        deleteTransactionMap.entrySet()) {
-      final ContainerInfo info = containers.getContainerInfo(
-          transaction.getKey());
-      info.updateDeleteTransactionId(transaction.getValue());
-      containerStore.put(info.containerID(), info);
+    lock.writeLock().lock();
+    try {
+      // TODO: Refactor this. Error handling is not done.
+      for (Map.Entry<ContainerID, Long> transaction :
+          deleteTransactionMap.entrySet()) {
+        final ContainerInfo info = containers.getContainerInfo(
+            transaction.getKey());
+        info.updateDeleteTransactionId(transaction.getValue());
+        containerStore.put(info.containerID(), info);
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -370,26 +427,31 @@ public final class ContainerStateManagerImpl
       resultSet = containerIDs;
     }
 
-    ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
-    if (selectedContainer == null) {
-
-      // If we did not find any space in the tailSet, we need to look for
-      // space in the headset, we need to pass true to deal with the
-      // situation that we have a lone container that has space. That is we
-      // ignored the last used container under the assumption we can find
-      // other containers with space, but if have a single container that is
-      // not true. Hence we need to include the last used container as the
-      // last element in the sorted set.
-
-      resultSet = containerIDs.headSet(lastID, true);
-      selectedContainer = findContainerWithSpace(size, resultSet);
-    }
+    lock.readLock().lock();
+    try {
+      ContainerInfo selectedContainer = findContainerWithSpace(size, 
resultSet);
+      if (selectedContainer == null) {
+
+        // If we did not find any space in the tailSet, we need to look for
+        // space in the headset, we need to pass true to deal with the
+        // situation that we have a lone container that has space. That is we
+        // ignored the last used container under the assumption we can find
+        // other containers with space, but if have a single container that is
+        // not true. Hence we need to include the last used container as the
+        // last element in the sorted set.
+
+        resultSet = containerIDs.headSet(lastID, true);
+        selectedContainer = findContainerWithSpace(size, resultSet);
+      }
 
-    // TODO: cleanup entries in lastUsedMap
-    if (selectedContainer != null) {
-      lastUsedMap.put(key, selectedContainer.containerID());
+      // TODO: cleanup entries in lastUsedMap
+      if (selectedContainer != null) {
+        lastUsedMap.put(key, selectedContainer.containerID());
+      }
+      return selectedContainer;
+    } finally {
+      lock.readLock().unlock();
     }
-    return selectedContainer;
   }
 
   private ContainerInfo findContainerWithSpace(final long size,
@@ -408,12 +470,17 @@ public final class ContainerStateManagerImpl
 
   public void removeContainer(final HddsProtos.ContainerID id)
       throws IOException {
-    final ContainerID cid = ContainerID.getFromProtobuf(id);
-    final ContainerInfo containerInfo = containers.getContainerInfo(cid);
-    ExecutionUtil.create(() -> {
-      containerStore.delete(cid);
-      containers.removeContainer(cid);
-    }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+    lock.writeLock().lock();
+    try {
+      final ContainerID cid = ContainerID.getFromProtobuf(id);
+      final ContainerInfo containerInfo = containers.getContainerInfo(cid);
+      ExecutionUtil.create(() -> {
+        containerStore.delete(cid);
+        containers.removeContainer(cid);
+      }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 4d143e0..4d7860d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -23,8 +23,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
@@ -92,12 +90,6 @@ public class ContainerStateMap {
   private final Map<ContainerID, Set<ContainerReplica>> replicaMap;
   private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache;
 
-  // Container State Map lock should be held before calling into
-  // Update ContainerAttributes. The consistency of ContainerAttributes is
-  // protected by this lock.
-  // Can we remove this lock?
-  private final ReadWriteLock lock;
-
   /**
    * Create a ContainerStateMap.
    */
@@ -107,7 +99,6 @@ public class ContainerStateMap {
     this.factorMap = new ContainerAttribute<>();
     this.typeMap = new ContainerAttribute<>();
     this.containerMap = new ConcurrentHashMap<>();
-    this.lock = new ReentrantReadWriteLock();
     this.replicaMap = new ConcurrentHashMap<>();
     this.resultCache = new ConcurrentHashMap<>();
   }
@@ -121,34 +112,24 @@ public class ContainerStateMap {
   public void addContainer(final ContainerInfo info)
       throws SCMException {
     Preconditions.checkNotNull(info, "Container Info cannot be null");
-    lock.writeLock().lock();
-    try {
-      final ContainerID id = info.containerID();
-      if (!contains(id)) {
-        containerMap.put(id, info);
-        lifeCycleStateMap.insert(info.getState(), id);
-        ownerMap.insert(info.getOwner(), id);
-        factorMap.insert(info.getReplicationFactor(), id);
-        typeMap.insert(info.getReplicationType(), id);
-        replicaMap.put(id, ConcurrentHashMap.newKeySet());
-
-        // Flush the cache of this container type, will be added later when
-        // get container queries are executed.
-        flushCache(info);
-        LOG.trace("Container {} added to ContainerStateMap.", id);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    final ContainerID id = info.containerID();
+    if (!contains(id)) {
+      containerMap.put(id, info);
+      lifeCycleStateMap.insert(info.getState(), id);
+      ownerMap.insert(info.getOwner(), id);
+      factorMap.insert(info.getReplicationFactor(), id);
+      typeMap.insert(info.getReplicationType(), id);
+      replicaMap.put(id, ConcurrentHashMap.newKeySet());
+
+      // Flush the cache of this container type, will be added later when
+      // get container queries are executed.
+      flushCache(info);
+      LOG.trace("Container {} added to ContainerStateMap.", id);
     }
   }
 
   public boolean contains(final ContainerID id) {
-    lock.readLock().lock();
-    try {
-      return containerMap.containsKey(id);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return containerMap.containsKey(id);
   }
 
   /**
@@ -158,22 +139,17 @@ public class ContainerStateMap {
    */
   public void removeContainer(final ContainerID id) {
     Preconditions.checkNotNull(id, "ContainerID cannot be null");
-    lock.writeLock().lock();
-    try {
-      if (contains(id)) {
-        // Should we revert back to the original state if any of the below
-        // remove operation fails?
-        final ContainerInfo info = containerMap.remove(id);
-        lifeCycleStateMap.remove(info.getState(), id);
-        ownerMap.remove(info.getOwner(), id);
-        factorMap.remove(info.getReplicationFactor(), id);
-        typeMap.remove(info.getReplicationType(), id);
-        // Flush the cache of this container type.
-        flushCache(info);
-        LOG.trace("Container {} removed from ContainerStateMap.", id);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (contains(id)) {
+      // Should we revert back to the original state if any of the below
+      // remove operation fails?
+      final ContainerInfo info = containerMap.remove(id);
+      lifeCycleStateMap.remove(info.getState(), id);
+      ownerMap.remove(info.getOwner(), id);
+      factorMap.remove(info.getReplicationFactor(), id);
+      typeMap.remove(info.getReplicationType(), id);
+      // Flush the cache of this container type.
+      flushCache(info);
+      LOG.trace("Container {} removed from ContainerStateMap.", id);
     }
   }
 
@@ -184,12 +160,7 @@ public class ContainerStateMap {
    * @return container info, if found else null.
    */
   public ContainerInfo getContainerInfo(final ContainerID containerID) {
-    lock.readLock().lock();
-    try {
-      return containerMap.get(containerID);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return containerMap.get(containerID);
   }
 
   /**
@@ -202,13 +173,8 @@ public class ContainerStateMap {
   public Set<ContainerReplica> getContainerReplicas(
       final ContainerID containerID) {
     Preconditions.checkNotNull(containerID);
-    lock.readLock().lock();
-    try {
-      final Set<ContainerReplica> replicas = replicaMap.get(containerID);
-      return replicas == null ? null : Collections.unmodifiableSet(replicas);
-    } finally {
-      lock.readLock().unlock();
-    }
+    final Set<ContainerReplica> replicas = replicaMap.get(containerID);
+    return replicas == null ? null : Collections.unmodifiableSet(replicas);
   }
 
   /**
@@ -222,15 +188,10 @@ public class ContainerStateMap {
   public void updateContainerReplica(final ContainerID containerID,
       final ContainerReplica replica) {
     Preconditions.checkNotNull(containerID);
-    lock.writeLock().lock();
-    try {
-      if (contains(containerID)) {
-        final Set<ContainerReplica> replicas = replicaMap.get(containerID);
-        replicas.remove(replica);
-        replicas.add(replica);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (contains(containerID)) {
+      final Set<ContainerReplica> replicas = replicaMap.get(containerID);
+      replicas.remove(replica);
+      replicas.add(replica);
     }
   }
 
@@ -245,13 +206,8 @@ public class ContainerStateMap {
       final ContainerReplica replica) {
     Preconditions.checkNotNull(containerID);
     Preconditions.checkNotNull(replica);
-    lock.writeLock().lock();
-    try {
-      if (contains(containerID)) {
-        replicaMap.get(containerID).remove(replica);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (contains(containerID)) {
+      replicaMap.get(containerID).remove(replica);
     }
   }
 
@@ -262,15 +218,10 @@ public class ContainerStateMap {
   public void updateContainerInfo(final ContainerInfo info) {
     Preconditions.checkNotNull(info);
     final ContainerID id = info.containerID();
-    lock.writeLock().lock();
-    try {
-      if (contains(id)) {
-        final ContainerInfo currentInfo = containerMap.get(id);
-        flushCache(info, currentInfo);
-        containerMap.put(id, info);
-      }
-    } finally {
-      lock.writeLock().unlock();
+    if (contains(id)) {
+      final ContainerInfo currentInfo = containerMap.get(id);
+      flushCache(info, currentInfo);
+      containerMap.put(id, info);
     }
   }
 
@@ -286,66 +237,56 @@ public class ContainerStateMap {
       LifeCycleState newState) throws SCMException {
     Preconditions.checkNotNull(currentState);
     Preconditions.checkNotNull(newState);
-    lock.writeLock().lock();
+    if (!contains(containerID)) {
+      return;
+    }
+
+    // TODO: Simplify this logic.
+    final ContainerInfo currentInfo = containerMap.get(containerID);
     try {
-      if (!contains(containerID)) {
-        return;
+      currentInfo.setState(newState);
+
+      // We are updating two places before this update is done, these can
+      // fail independently, since the code needs to handle it.
+
+      // We update the attribute map, if that fails it will throw an
+      // exception, so no issues, if we are successful, we keep track of the
+      // fact that we have updated the lifecycle state in the map, and update
+      // the container state. If this second update fails, we will attempt to
+      // roll back the earlier change we did. If the rollback fails, we can
+      // be in an inconsistent state,
+
+      lifeCycleStateMap.update(currentState, newState, containerID);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+            "{}", containerID, currentState, newState);
       }
 
-      // TODO: Simplify this logic.
-      final ContainerInfo currentInfo = containerMap.get(containerID);
-      try {
-        currentInfo.setState(newState);
-
-        // We are updating two places before this update is done, these can
-        // fail independently, since the code needs to handle it.
-
-        // We update the attribute map, if that fails it will throw an
-        // exception, so no issues, if we are successful, we keep track of the
-        // fact that we have updated the lifecycle state in the map, and update
-        // the container state. If this second update fails, we will attempt to
-        // roll back the earlier change we did. If the rollback fails, we can
-        // be in an inconsistent state,
-
-        lifeCycleStateMap.update(currentState, newState, containerID);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Updated the container {} to new state. Old = {}, new = " +
-              "{}", containerID, currentState, newState);
-        }
-
-        // Just flush both old and new data sets from the result cache.
-        flushCache(currentInfo);
-      } catch (SCMException ex) {
-        LOG.error("Unable to update the container state.", ex);
-        // we need to revert the change in this attribute since we are not
-        // able to update the hash table.
-        LOG.info("Reverting the update to lifecycle state. Moving back to " +
-                "old state. Old = {}, Attempted state = {}", currentState,
-            newState);
-
-        currentInfo.setState(currentState);
-
-        // if this line throws, the state map can be in an inconsistent
-        // state, since we will have modified the attribute by the
-        // container state will not in sync since we were not able to put
-        // that into the hash table.
-        lifeCycleStateMap.update(newState, currentState, containerID);
-
-        throw new SCMException("Updating the container map failed.", ex,
-            FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-    } finally {
-      lock.writeLock().unlock();
+      // Just flush both old and new data sets from the result cache.
+      flushCache(currentInfo);
+    } catch (SCMException ex) {
+      LOG.error("Unable to update the container state.", ex);
+      // we need to revert the change in this attribute since we are not
+      // able to update the hash table.
+      LOG.info("Reverting the update to lifecycle state. Moving back to " +
+              "old state. Old = {}, Attempted state = {}", currentState,
+          newState);
+
+      currentInfo.setState(currentState);
+
+      // if this line throws, the state map can be in an inconsistent
+      // state, since we will have modified the attribute by the
+      // container state will not in sync since we were not able to put
+      // that into the hash table.
+      lifeCycleStateMap.update(newState, currentState, containerID);
+
+      throw new SCMException("Updating the container map failed.", ex,
+          FAILED_TO_CHANGE_CONTAINER_STATE);
     }
   }
 
   public Set<ContainerID> getAllContainerIDs() {
-    lock.readLock().lock();
-    try {
-      return Collections.unmodifiableSet(containerMap.keySet());
-    } finally {
-      lock.readLock().unlock();
-    }
+    return Collections.unmodifiableSet(containerMap.keySet());
   }
 
   /**
@@ -356,12 +297,7 @@ public class ContainerStateMap {
    */
   NavigableSet<ContainerID> getContainerIDsByOwner(final String ownerName) {
     Preconditions.checkNotNull(ownerName);
-    lock.readLock().lock();
-    try {
-      return ownerMap.getCollection(ownerName);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return ownerMap.getCollection(ownerName);
   }
 
   /**
@@ -372,12 +308,7 @@ public class ContainerStateMap {
    */
   NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) {
     Preconditions.checkNotNull(type);
-    lock.readLock().lock();
-    try {
-      return typeMap.getCollection(type);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return typeMap.getCollection(type);
   }
 
   /**
@@ -389,12 +320,7 @@ public class ContainerStateMap {
   NavigableSet<ContainerID> getContainerIDsByFactor(
       final ReplicationFactor factor) {
     Preconditions.checkNotNull(factor);
-    lock.readLock().lock();
-    try {
-      return factorMap.getCollection(factor);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return factorMap.getCollection(factor);
   }
 
   /**
@@ -406,12 +332,7 @@ public class ContainerStateMap {
   public NavigableSet<ContainerID> getContainerIDsByState(
       final LifeCycleState state) {
     Preconditions.checkNotNull(state);
-    lock.readLock().lock();
-    try {
-      return lifeCycleStateMap.getCollection(state);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return lifeCycleStateMap.getCollection(state);
   }
 
   /**
@@ -432,57 +353,52 @@ public class ContainerStateMap {
     Preconditions.checkNotNull(factor, "Factor cannot be null");
     Preconditions.checkNotNull(type, "Type cannot be null");
 
-    lock.readLock().lock();
-    try {
-      final ContainerQueryKey queryKey =
-          new ContainerQueryKey(state, owner, factor, type);
-      if(resultCache.containsKey(queryKey)){
-        return resultCache.get(queryKey);
-      }
+    final ContainerQueryKey queryKey =
+        new ContainerQueryKey(state, owner, factor, type);
+    if(resultCache.containsKey(queryKey)){
+      return resultCache.get(queryKey);
+    }
 
-      // If we cannot meet any one condition we return EMPTY_SET immediately.
-      // Since when we intersect these sets, the result will be empty if any
-      // one is empty.
-      final NavigableSet<ContainerID> stateSet =
-          lifeCycleStateMap.getCollection(state);
-      if (stateSet.size() == 0) {
-        return EMPTY_SET;
-      }
+    // If we cannot meet any one condition we return EMPTY_SET immediately.
+    // Since when we intersect these sets, the result will be empty if any
+    // one is empty.
+    final NavigableSet<ContainerID> stateSet =
+        lifeCycleStateMap.getCollection(state);
+    if (stateSet.size() == 0) {
+      return EMPTY_SET;
+    }
 
-      final NavigableSet<ContainerID> ownerSet =
-          ownerMap.getCollection(owner);
-      if (ownerSet.size() == 0) {
-        return EMPTY_SET;
-      }
+    final NavigableSet<ContainerID> ownerSet =
+        ownerMap.getCollection(owner);
+    if (ownerSet.size() == 0) {
+      return EMPTY_SET;
+    }
 
-      final NavigableSet<ContainerID> factorSet =
-          factorMap.getCollection(factor);
-      if (factorSet.size() == 0) {
-        return EMPTY_SET;
-      }
+    final NavigableSet<ContainerID> factorSet =
+        factorMap.getCollection(factor);
+    if (factorSet.size() == 0) {
+      return EMPTY_SET;
+    }
 
-      final NavigableSet<ContainerID> typeSet =
-          typeMap.getCollection(type);
-      if (typeSet.size() == 0) {
-        return EMPTY_SET;
-      }
+    final NavigableSet<ContainerID> typeSet =
+        typeMap.getCollection(type);
+    if (typeSet.size() == 0) {
+      return EMPTY_SET;
+    }
 
 
-      // if we add more constraints we will just add those sets here..
-      final NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
-          ownerSet, factorSet, typeSet);
+    // if we add more constraints we will just add those sets here..
+    final NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+        ownerSet, factorSet, typeSet);
 
-      NavigableSet<ContainerID> currentSet = sets[0];
-      // We take the smallest set and intersect against the larger sets. This
-      // allows us to reduce the lookups to the least possible number.
-      for (int x = 1; x < sets.length; x++) {
-        currentSet = intersectSets(currentSet, sets[x]);
-      }
-      resultCache.put(queryKey, currentSet);
-      return currentSet;
-    } finally {
-      lock.readLock().unlock();
+    NavigableSet<ContainerID> currentSet = sets[0];
+    // We take the smallest set and intersect against the larger sets. This
+    // allows us to reduce the lookups to the least possible number.
+    for (int x = 1; x < sets.length; x++) {
+      currentSet = intersectSets(currentSet, sets[x]);
     }
+    resultCache.put(queryKey, currentSet);
+    return currentSet;
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 8b7d849..7d6c88a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -52,8 +52,8 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * SCM Pipeline Manager implementation.
@@ -64,7 +64,8 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMPipelineManager.class);
 
-  private final ReadWriteLock lock;
+  // Limit the number of on-going ratis operation to be 1.
+  private final Lock lock;
   private PipelineFactory pipelineFactory;
   private StateManager stateManager;
   private Scheduler scheduler;
@@ -88,7 +89,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
                                 StateManager pipelineStateManager,
                                 PipelineFactory pipelineFactory,
                                 EventPublisher eventPublisher) {
-    this.lock = new ReentrantReadWriteLock();
+    this.lock = new ReentrantLock();
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
     this.conf = conf;
@@ -149,7 +150,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       throw new IOException("Pipeline creation is not allowed as safe mode " +
           "prechecks have not yet passed");
     }
-    lock.writeLock().lock();
+    lock.lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
       stateManager.addPipeline(pipeline.getProtobufMessage());
@@ -161,7 +162,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       metrics.incNumPipelineCreationFailed();
       throw ex;
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -170,90 +171,52 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
                                  List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
     // We don't update the metrics for SimplePipelines.
-    lock.writeLock().lock();
-    try {
-      return pipelineFactory.create(type, factor, nodes);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    return pipelineFactory.create(type, factor, nodes);
   }
 
   @Override
   public Pipeline getPipeline(PipelineID pipelineID)
       throws PipelineNotFoundException {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipeline(pipelineID);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return stateManager.getPipeline(pipelineID);
   }
 
   @Override
   public boolean containsPipeline(PipelineID pipelineID) {
-    lock.readLock().lock();
     try {
       getPipeline(pipelineID);
       return true;
     } catch (PipelineNotFoundException e) {
       return false;
-    } finally {
-      lock.readLock().unlock();
     }
   }
 
   @Override
   public List<Pipeline> getPipelines() {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines();
-    } finally {
-      lock.readLock().unlock();
-    }
+    return stateManager.getPipelines();
   }
 
   @Override
   public List<Pipeline> getPipelines(ReplicationType type) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(type);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return stateManager.getPipelines(type);
   }
 
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
-                              ReplicationFactor factor) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(type, factor);
-    } finally {
-      lock.readLock().unlock();
-    }
+                                     ReplicationFactor factor) {
+    return stateManager.getPipelines(type, factor);
   }
 
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
-                              Pipeline.PipelineState state) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(type, state);
-    } finally {
-      lock.readLock().unlock();
-    }
+                                     Pipeline.PipelineState state) {
+    return stateManager.getPipelines(type, state);
   }
 
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
-                              ReplicationFactor factor,
-                              Pipeline.PipelineState state) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(type, factor, state);
-    } finally {
-      lock.readLock().unlock();
-    }
+                                     ReplicationFactor factor,
+                                     Pipeline.PipelineState state) {
+    return stateManager.getPipelines(type, factor, state);
   }
 
   @Override
@@ -261,46 +224,28 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       ReplicationType type, ReplicationFactor factor,
       Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
       Collection<PipelineID> excludePipelines) {
-    lock.readLock().lock();
-    try {
-      return stateManager
-          .getPipelines(type, factor, state, excludeDns, excludePipelines);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return stateManager
+        .getPipelines(type, factor, state, excludeDns, excludePipelines);
   }
 
   @Override
   public void addContainerToPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.addContainerToPipeline(pipelineID, containerID);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    // should not lock here, since no ratis operation happens.
+    stateManager.addContainerToPipeline(pipelineID, containerID);
   }
 
   @Override
   public void removeContainerFromPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.removeContainerFromPipeline(pipelineID, containerID);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    // should not lock here, since no ratis operation happens.
+    stateManager.removeContainerFromPipeline(pipelineID, containerID);
   }
 
   @Override
   public NavigableSet<ContainerID> getContainersInPipeline(
       PipelineID pipelineID) throws IOException {
-    lock.readLock().lock();
-    try {
-      return stateManager.getContainers(pipelineID);
-    } finally {
-      lock.readLock().unlock();
-    }
+    return stateManager.getContainers(pipelineID);
   }
 
   @Override
@@ -310,7 +255,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
 
   @Override
   public void openPipeline(PipelineID pipelineId) throws IOException {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       Pipeline pipeline = stateManager.getPipeline(pipelineId);
       if (pipeline.isClosed()) {
@@ -324,7 +269,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -337,7 +282,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   protected void removePipeline(Pipeline pipeline) throws IOException {
     pipelineFactory.close(pipeline.getType(), pipeline);
     PipelineID pipelineID = pipeline.getId();
-    lock.writeLock().lock();
+    lock.lock();
     try {
       stateManager.removePipeline(pipelineID.getProtobuf());
       metrics.incNumPipelineDestroyed();
@@ -345,7 +290,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       metrics.incNumPipelineDestroyFailed();
       throw ex;
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -372,7 +317,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   public void closePipeline(Pipeline pipeline, boolean onTimeout)
       throws IOException {
     PipelineID pipelineID = pipeline.getId();
-    lock.writeLock().lock();
+    lock.lock();
     try {
       if (!pipeline.isClosed()) {
         stateManager.updatePipelineState(pipelineID.getProtobuf(),
@@ -381,7 +326,7 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
       }
       metrics.removePipelineMetrics(pipelineID);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
     // close containers.
     closeContainersForPipeline(pipelineID);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to