This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 91eb342  HDDS-4560. Add ReadWriteLock into PipelineStateManagerV2Impl 
to protect contentions between RaftServer and PipelineManager. (#1676)
91eb342 is described below

commit 91eb342c0509ae54283d6a2ad2e2638380812b34
Author: GlenGeng <[email protected]>
AuthorDate: Mon Dec 14 01:06:16 2020 +0800

    HDDS-4560. Add ReadWriteLock into PipelineStateManagerV2Impl to protect 
contentions between RaftServer and PipelineManager. (#1676)
---
 .../scm/pipeline/PipelineStateManagerV2Impl.java   | 129 ++++++++++++++++-----
 1 file changed, 102 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 703cdec..8f5ebc4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -35,6 +35,8 @@ import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Implementation of pipeline state manager.
@@ -49,7 +51,11 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
 
   private final PipelineStateMap pipelineStateMap;
   private final NodeManager nodeManager;
-  private Table<PipelineID, Pipeline> pipelineStore;
+  private final Table<PipelineID, Pipeline> pipelineStore;
+
+  // Protect potential contentions between RaftServer and PipelineManager.
+  // See https://issues.apache.org/jira/browse/HDDS-4560
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
   public PipelineStateManagerV2Impl(
       Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
@@ -79,47 +85,82 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
   @Override
   public void addPipeline(HddsProtos.Pipeline pipelineProto)
       throws IOException {
-    Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
-    pipelineStore.put(pipeline.getId(), pipeline);
-    pipelineStateMap.addPipeline(pipeline);
-    nodeManager.addPipeline(pipeline);
-    LOG.info("Created pipeline {}.", pipeline);
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
+      pipelineStore.put(pipeline.getId(), pipeline);
+      pipelineStateMap.addPipeline(pipeline);
+      nodeManager.addPipeline(pipeline);
+      LOG.info("Created pipeline {}.", pipeline);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
   public void addContainerToPipeline(
       PipelineID pipelineId, ContainerID containerID)
       throws IOException {
-    pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+    lock.writeLock().lock();
+    try {
+      pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
   public Pipeline getPipeline(PipelineID pipelineID)
       throws PipelineNotFoundException {
-    return pipelineStateMap.getPipeline(pipelineID);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipeline(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public List<Pipeline> getPipelines() {
-    return pipelineStateMap.getPipelines();
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelines();
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
-    return pipelineStateMap.getPipelines(type);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelines(type);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public List<Pipeline> getPipelines(
       HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
-    return pipelineStateMap.getPipelines(type, factor);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelines(type, factor);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public List<Pipeline> getPipelines(
       HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
                               Pipeline.PipelineState state) {
-    return pipelineStateMap.getPipelines(type, factor, state);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelines(type, factor, state);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
@@ -127,52 +168,86 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
       HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
       Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
       Collection<PipelineID> excludePipelines) {
-    return pipelineStateMap
-        .getPipelines(type, factor, state, excludeDns, excludePipelines);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap
+          .getPipelines(type, factor, state, excludeDns, excludePipelines);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
                                      Pipeline.PipelineState... states) {
-    return pipelineStateMap.getPipelines(type, states);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelines(type, states);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
       throws IOException {
-    return pipelineStateMap.getContainers(pipelineID);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getContainers(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
-    return pipelineStateMap.getNumberOfContainers(pipelineID);
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getNumberOfContainers(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
       throws IOException {
-    PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
-    pipelineStore.delete(pipelineID);
-    Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
-    nodeManager.removePipeline(pipeline);
-    LOG.info("Pipeline {} removed.", pipeline);
-    return;
+    lock.writeLock().lock();
+    try {
+      PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+      pipelineStore.delete(pipelineID);
+      Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
+      nodeManager.removePipeline(pipeline);
+      LOG.info("Pipeline {} removed.", pipeline);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
 
   @Override
   public void removeContainerFromPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
-    pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+    lock.writeLock().lock();
+    try {
+      pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
   public void updatePipelineState(
       HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
       throws IOException {
-    pipelineStateMap.updatePipelineState(
-        PipelineID.getFromProtobuf(pipelineIDProto),
-        Pipeline.PipelineState.fromProtobuf(newState));
+    lock.writeLock().lock();
+    try {
+      pipelineStateMap.updatePipelineState(
+          PipelineID.getFromProtobuf(pipelineIDProto),
+          Pipeline.PipelineState.fromProtobuf(newState));
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to