This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bf6f323109 HDDS-11413. PipelineManagerImpl lock optimization reduces 
AllocateBlock latency (#7160)
bf6f323109 is described below

commit bf6f323109dc48c7017aacd5d5bfcd4f52991b04
Author: hao guo <[email protected]>
AuthorDate: Fri Dec 13 08:36:33 2024 +0800

    HDDS-11413. PipelineManagerImpl lock optimization reduces AllocateBlock 
latency (#7160)
---
 .../hdds/scm/container/ContainerManagerImpl.java   |  8 +--
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  8 +--
 .../hdds/scm/pipeline/PipelineManagerImpl.java     | 81 +++++++++++++++-------
 .../pipeline/WritableRatisContainerProvider.java   |  4 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |  8 +--
 .../ozone/recon/scm/ReconPipelineManager.java      | 65 +++++++++++------
 6 files changed, 111 insertions(+), 63 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index d61f9ee366..b48e52dafe 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -182,7 +182,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
     // Acquire pipeline manager lock, to avoid any updates to pipeline
     // while allocate container happens. This is to avoid scenario like
     // mentioned in HDDS-5655.
-    pipelineManager.acquireReadLock();
+    pipelineManager.acquireReadLock(replicationConfig);
     lock.lock();
     List<Pipeline> pipelines;
     Pipeline pipeline;
@@ -196,7 +196,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
       }
     } finally {
       lock.unlock();
-      pipelineManager.releaseReadLock();
+      pipelineManager.releaseReadLock(replicationConfig);
     }
 
     if (pipelines.isEmpty()) {
@@ -209,7 +209,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
             " matching pipeline for replicationConfig: " + replicationConfig
             + ", State:PipelineState.OPEN", e);
       }
-      pipelineManager.acquireReadLock();
+      pipelineManager.acquireReadLock(replicationConfig);
       lock.lock();
       try {
         pipelines = pipelineManager
@@ -224,7 +224,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
         }
       } finally {
         lock.unlock();
-        pipelineManager.releaseReadLock();
+        pipelineManager.releaseReadLock(replicationConfig);
       }
     }
     return containerInfo;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 15b0f408c5..77353adc7b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -207,20 +207,20 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
   /**
    * Acquire read lock.
    */
-  void acquireReadLock();
+  void acquireReadLock(ReplicationConfig replicationConfig);
 
   /**
    * Release read lock.
    */
-  void releaseReadLock();
+  void releaseReadLock(ReplicationConfig replicationConfig);
 
   /**
    * Acquire write lock.
    */
-  void acquireWriteLock();
+  void acquireWriteLock(ReplicationConfig replicationConfig);
 
   /**
    * Release write lock.
    */
-  void releaseWriteLock();
+  void releaseWriteLock(ReplicationConfig replicationConfig);
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 000d3e7363..d3dd42cdca 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -77,6 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {
 
   // Limit the number of on-going ratis operation to be 1.
   private final ReentrantReadWriteLock lock;
+  private final ReentrantReadWriteLock ecPipelineLock;
   private PipelineFactory pipelineFactory;
   private PipelineStateManager stateManager;
   private BackgroundPipelineCreator backgroundPipelineCreator;
@@ -105,6 +106,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
                                 SCMContext scmContext,
                                 Clock clock) {
     this.lock = new ReentrantReadWriteLock();
+    this.ecPipelineLock = new ReentrantReadWriteLock();
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
     this.conf = conf;
@@ -248,7 +250,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
       throws IOException {
     checkIfPipelineCreationIsAllowed(replicationConfig);
 
-    acquireWriteLock();
+    acquireWriteLock(replicationConfig);
     final Pipeline pipeline;
     try {
       try {
@@ -261,7 +263,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
       addPipelineToManager(pipeline);
       return pipeline;
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
   }
 
@@ -286,7 +288,8 @@ public class PipelineManagerImpl implements PipelineManager 
{
       throws IOException {
     HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
         ClientVersion.CURRENT_VERSION);
-    acquireWriteLock();
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    acquireWriteLock(replicationConfig);
     try {
       stateManager.addPipeline(pipelineProto);
     } catch (IOException ex) {
@@ -294,7 +297,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
       metrics.incNumPipelineCreationFailed();
       throw ex;
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
     recordMetricsForPipeline(pipeline);
   }
@@ -419,19 +422,23 @@ public class PipelineManagerImpl implements 
PipelineManager {
   public void openPipeline(PipelineID pipelineId)
       throws IOException {
     HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
-    acquireWriteLock();
-    final Pipeline pipeline;
+
+    final Pipeline pipeline = getPipeline(pipelineId);
+    ReplicationConfig replicationConfig = null;
     try {
-      pipeline = stateManager.getPipeline(pipelineId);
       if (pipeline.isClosed()) {
         throw new IOException("Closed pipeline can not be opened");
       }
+      replicationConfig = pipeline.getReplicationConfig();
+      acquireWriteLock(replicationConfig);
       if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
         stateManager.updatePipelineState(pipelineIdProtobuf,
             HddsProtos.PipelineState.PIPELINE_OPEN);
       }
     } finally {
-      releaseWriteLock();
+      if (replicationConfig != null) {
+        releaseWriteLock(replicationConfig);
+      }
     }
     metrics.incNumPipelineCreated();
     metrics.createPerPipelineMetrics(pipeline);
@@ -447,14 +454,15 @@ public class PipelineManagerImpl implements 
PipelineManager {
       throws IOException {
     pipelineFactory.close(pipeline.getType(), pipeline);
     HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
-    acquireWriteLock();
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    acquireWriteLock(replicationConfig);
     try {
       stateManager.removePipeline(pipelineID);
     } catch (IOException ex) {
       metrics.incNumPipelineDestroyFailed();
       throw ex;
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
     LOG.info("Pipeline {} removed.", pipeline);
     metrics.incNumPipelineDestroyed();
@@ -507,19 +515,20 @@ public class PipelineManagerImpl implements 
PipelineManager {
     HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
     // close containers.
     closeContainersForPipeline(pipelineID);
-    if (!getPipeline(pipelineID).isClosed()) {
-      acquireWriteLock();
+    Pipeline pipeline = getPipeline(pipelineID);
+    if (!pipeline.isClosed()) {
+      ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+      acquireWriteLock(replicationConfig);
       try {
         stateManager.updatePipelineState(pipelineIDProtobuf,
             HddsProtos.PipelineState.PIPELINE_CLOSED);
       } finally {
-        releaseWriteLock();
+        releaseWriteLock(replicationConfig);
       }
       LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
     }
 
     metrics.removePipelineMetrics(pipelineID);
-
   }
 
   /**
@@ -684,12 +693,14 @@ public class PipelineManagerImpl implements 
PipelineManager {
   public void activatePipeline(PipelineID pipelineID)
       throws IOException {
     HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
-    acquireWriteLock();
+    Pipeline pipeline = getPipeline(pipelineID);
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    acquireWriteLock(replicationConfig);
     try {
       stateManager.updatePipelineState(pipelineIDProtobuf,
           HddsProtos.PipelineState.PIPELINE_OPEN);
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
   }
 
@@ -703,12 +714,14 @@ public class PipelineManagerImpl implements 
PipelineManager {
   public void deactivatePipeline(PipelineID pipelineID)
       throws IOException {
     HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
-    acquireWriteLock();
+    Pipeline pipeline = getPipeline(pipelineID);
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    acquireWriteLock(replicationConfig);
     try {
       stateManager.updatePipelineState(pipelineIDProtobuf,
           HddsProtos.PipelineState.PIPELINE_DORMANT);
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
   }
 
@@ -931,22 +944,38 @@ public class PipelineManagerImpl implements 
PipelineManager {
   }
 
   @Override
-  public void acquireReadLock() {
-    lock.readLock().lock();
+  public void acquireReadLock(ReplicationConfig replicationConfig) {
+    if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+      ecPipelineLock.readLock().lock();
+    } else {
+      lock.readLock().lock();
+    }
   }
 
   @Override
-  public void releaseReadLock() {
-    lock.readLock().unlock();
+  public void releaseReadLock(ReplicationConfig replicationConfig) {
+    if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+      ecPipelineLock.readLock().unlock();
+    } else {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
-  public void acquireWriteLock() {
-    lock.writeLock().lock();
+  public void acquireWriteLock(ReplicationConfig replicationConfig) {
+    if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+      ecPipelineLock.writeLock().lock();
+    } else {
+      lock.writeLock().lock();
+    }
   }
 
   @Override
-  public void releaseWriteLock() {
-    lock.writeLock().unlock();
+  public void releaseWriteLock(ReplicationConfig replicationConfig) {
+    if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+      ecPipelineLock.writeLock().unlock();
+    } else {
+      lock.writeLock().unlock();
+    }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
index 99a58f690c..a1b2a28493 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -156,13 +156,13 @@ public class WritableRatisContainerProvider
     // Acquire pipeline manager lock, to avoid any updates to pipeline
     // while allocate container happens. This is to avoid scenario like
     // mentioned in HDDS-5655.
-    pipelineManager.acquireReadLock();
+    pipelineManager.acquireReadLock(repConfig);
     try {
       List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
           excludeList, Pipeline.PipelineState.OPEN);
       return selectContainer(availablePipelines, req, owner, excludeList);
     } finally {
-      pipelineManager.releaseReadLock();
+      pipelineManager.releaseReadLock(repConfig);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 6ece2ecb88..952dc1f010 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -332,22 +332,22 @@ public class MockPipelineManager implements 
PipelineManager {
   }
 
   @Override
-  public void acquireReadLock() {
+  public void acquireReadLock(ReplicationConfig replicationConfig) {
 
   }
 
   @Override
-  public void releaseReadLock() {
+  public void releaseReadLock(ReplicationConfig replicationConfig) {
 
   }
 
   @Override
-  public void acquireWriteLock() {
+  public void acquireWriteLock(ReplicationConfig replicationConfig) {
 
   }
 
   @Override
-  public void releaseWriteLock() {
+  public void releaseWriteLock(ReplicationConfig replicationConfig) {
 
   }
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 6bc54f3e6e..8e5e514f8c 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -24,6 +24,7 @@ import java.time.ZoneOffset;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -98,29 +99,46 @@ public final class ReconPipelineManager extends 
PipelineManagerImpl {
    */
   void initializePipelines(List<Pipeline> pipelinesFromScm)
       throws IOException {
-
-    acquireWriteLock();
-    try {
-      List<Pipeline> pipelinesInHouse = getPipelines();
-      LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
-      for (Pipeline pipeline : pipelinesFromScm) {
-        // New pipeline got from SCM. Validate If it doesn't exist at Recon, 
try adding it.
-        if (addPipeline(pipeline)) {
-          LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
-        } else {
-          LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", 
pipeline.getId());
-          // Recon already has this pipeline. Just update state and creation
-          // time.
-          getStateManager().updatePipelineState(
-              pipeline.getId().getProtobuf(),
-              Pipeline.PipelineState.getProtobuf(pipeline.getPipelineState()));
-          getPipeline(pipeline.getId()).setCreationTimestamp(
-              pipeline.getCreationTimestamp());
+    HddsProtos.ReplicationType[] replicationTypes =
+        HddsProtos.ReplicationType.values();
+    for (HddsProtos.ReplicationType replicationType : replicationTypes) {
+      List<Pipeline> pipelines = pipelinesFromScm.stream().filter(
+          p -> p.getReplicationConfig().getReplicationType()
+              .equals(replicationType)).collect(Collectors.toList());
+
+      if (!pipelines.isEmpty()) {
+        ReplicationConfig replicationConfig =
+            pipelines.iterator().next().getReplicationConfig();
+
+        acquireWriteLock(replicationConfig);
+        try {
+          List<Pipeline> pipelinesInHouse = getPipelines().stream().filter(
+              p -> p.getReplicationConfig().getReplicationType()
+                  .equals(replicationType)).collect(Collectors.toList());
+
+          LOG.info("Recon has {} pipelines in house.", 
pipelinesInHouse.size());
+          for (Pipeline pipeline : pipelines) {
+            // New pipeline got from SCM. Validate If it doesn't exist at 
Recon, try adding it.
+            if (addPipeline(pipeline)) {
+              LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
+            } else {
+              LOG.warn("Pipeline {} already exists in Recon pipeline 
metadata.",
+                  pipeline.getId());
+              // Recon already has this pipeline. Just update state and 
creation
+              // time.
+              getStateManager().updatePipelineState(
+                  pipeline.getId().getProtobuf(),
+                  Pipeline.PipelineState.getProtobuf(
+                      pipeline.getPipelineState()));
+              getPipeline(pipeline.getId()).setCreationTimestamp(
+                  pipeline.getCreationTimestamp());
+            }
+          }
+          removeInvalidPipelines(pipelines);
+        } finally {
+          releaseWriteLock(replicationConfig);
         }
       }
-      removeInvalidPipelines(pipelinesFromScm);
-    } finally {
-      releaseWriteLock();
     }
   }
 
@@ -163,7 +181,8 @@ public final class ReconPipelineManager extends 
PipelineManagerImpl {
    */
   @VisibleForTesting
   public boolean addPipeline(Pipeline pipeline) throws IOException {
-    acquireWriteLock();
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    acquireWriteLock(replicationConfig);
     try {
       // Check if the pipeline already exists
       if (containsPipeline(pipeline.getId())) {
@@ -172,7 +191,7 @@ public final class ReconPipelineManager extends 
PipelineManagerImpl {
       
getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
       return true;
     } finally {
-      releaseWriteLock();
+      releaseWriteLock(replicationConfig);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to