This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 869654957d HDDS-8905. PipelineManager metrics should not be 
synchronized (#4959)
869654957d is described below

commit 869654957d54a258d6bc37dde92749e79c191d18
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jun 23 12:15:52 2023 +0200

    HDDS-8905. PipelineManager metrics should not be synchronized (#4959)
---
 .../hdds/scm/pipeline/PipelineManagerImpl.java     | 70 +++++++++++++---------
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      | 23 ++++---
 2 files changed, 57 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 33c587995a..cbfe9a6a07 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -221,13 +221,12 @@ public class PipelineManagerImpl implements 
PipelineManager {
     }
 
     acquireWriteLock();
+    final Pipeline pipeline;
     try {
-      Pipeline pipeline = pipelineFactory.create(replicationConfig,
+      pipeline = pipelineFactory.create(replicationConfig,
           excludedNodes, favoredNodes);
       stateManager.addPipeline(pipeline.getProtobufMessage(
           ClientVersion.CURRENT_VERSION));
-      recordMetricsForPipeline(pipeline);
-      return pipeline;
     } catch (IOException | TimeoutException ex) {
       LOG.debug("Failed to create pipeline with replicationConfig {}.",
           replicationConfig, ex);
@@ -236,6 +235,9 @@ public class PipelineManagerImpl implements PipelineManager 
{
     } finally {
       releaseWriteLock();
     }
+
+    recordMetricsForPipeline(pipeline);
+    return pipeline;
   }
 
   private boolean factorOne(ReplicationConfig replicationConfig) {
@@ -309,14 +311,14 @@ public class PipelineManagerImpl implements 
PipelineManager {
         .getPipelines(replicationConfig, state, excludeDns, excludePipelines);
   }
 
-  @Override
   /**
    * Returns the count of pipelines meeting the given ReplicationConfig and
    * state.
-   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param config The ReplicationConfig of the pipelines to count
    * @param state The current state of the pipelines to count
    * @return The count of pipelines meeting the above criteria
    */
+  @Override
   public int getPipelineCount(ReplicationConfig config,
                                      Pipeline.PipelineState state) {
     return stateManager.getPipelineCount(config, state);
@@ -357,22 +359,29 @@ public class PipelineManagerImpl implements 
PipelineManager {
   @Override
   public void openPipeline(PipelineID pipelineId)
       throws IOException, TimeoutException {
+    HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
     acquireWriteLock();
+    final Pipeline pipeline;
+    boolean opened = false;
     try {
-      Pipeline pipeline = stateManager.getPipeline(pipelineId);
+      pipeline = stateManager.getPipeline(pipelineId);
       if (pipeline.isClosed()) {
         throw new IOException("Closed pipeline can not be opened");
       }
       if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-        LOG.info("Pipeline {} moved to OPEN state", pipeline);
-        stateManager.updatePipelineState(
-            pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN);
+        stateManager.updatePipelineState(pipelineIdProtobuf,
+            HddsProtos.PipelineState.PIPELINE_OPEN);
+        opened = true;
       }
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
     } finally {
       releaseWriteLock();
     }
+
+    if (opened) {
+      LOG.info("Pipeline {} moved to OPEN state", pipeline);
+    }
+    metrics.incNumPipelineCreated();
+    metrics.createPerPipelineMetrics(pipeline);
   }
 
   /**
@@ -384,17 +393,18 @@ public class PipelineManagerImpl implements 
PipelineManager {
   protected void removePipeline(Pipeline pipeline)
       throws IOException, TimeoutException {
     pipelineFactory.close(pipeline.getType(), pipeline);
-    PipelineID pipelineID = pipeline.getId();
+    HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
     acquireWriteLock();
     try {
-      stateManager.removePipeline(pipelineID.getProtobuf());
-      metrics.incNumPipelineDestroyed();
+      stateManager.removePipeline(pipelineID);
     } catch (IOException ex) {
       metrics.incNumPipelineDestroyFailed();
       throw ex;
     } finally {
       releaseWriteLock();
     }
+
+    metrics.incNumPipelineDestroyed();
   }
 
   /**
@@ -402,7 +412,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
    * @param pipelineId - ID of the pipeline.
    * @throws IOException
    */
-  protected void closeContainersForPipeline(final PipelineID pipelineId)
+  private void closeContainersForPipeline(final PipelineID pipelineId)
       throws IOException, TimeoutException {
     Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
     ContainerManager containerManager = scmContext.getScm()
@@ -432,19 +442,23 @@ public class PipelineManagerImpl implements 
PipelineManager {
   public void closePipeline(Pipeline pipeline, boolean onTimeout)
       throws IOException, TimeoutException {
     PipelineID pipelineID = pipeline.getId();
+    HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
     // close containers.
     closeContainersForPipeline(pipelineID);
-    acquireWriteLock();
-    try {
-      if (!pipeline.isClosed()) {
-        stateManager.updatePipelineState(pipelineID.getProtobuf(),
+
+    if (!pipeline.isClosed()) {
+      acquireWriteLock();
+      try {
+        stateManager.updatePipelineState(pipelineIDProtobuf,
             HddsProtos.PipelineState.PIPELINE_CLOSED);
-        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+      } finally {
+        releaseWriteLock();
       }
-      metrics.removePipelineMetrics(pipelineID);
-    } finally {
-      releaseWriteLock();
+      LOG.info("Pipeline {} moved to CLOSED state", pipeline);
     }
+
+    metrics.removePipelineMetrics(pipelineID);
+
     if (!onTimeout) {
       // close pipeline right away.
       removePipeline(pipeline);
@@ -497,7 +511,7 @@ public class PipelineManagerImpl implements PipelineManager 
{
   @Override
   public void scrubPipelines() throws IOException, TimeoutException {
     Instant currentTime = clock.instant();
-    Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+    long pipelineScrubTimeoutInMills = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
@@ -595,10 +609,11 @@ public class PipelineManagerImpl implements 
PipelineManager {
   @Override
   public void activatePipeline(PipelineID pipelineID)
       throws IOException, TimeoutException {
+    HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
     acquireWriteLock();
     try {
-      stateManager.updatePipelineState(pipelineID.getProtobuf(),
-              HddsProtos.PipelineState.PIPELINE_OPEN);
+      stateManager.updatePipelineState(pipelineIDProtobuf,
+          HddsProtos.PipelineState.PIPELINE_OPEN);
     } finally {
       releaseWriteLock();
     }
@@ -613,9 +628,10 @@ public class PipelineManagerImpl implements 
PipelineManager {
   @Override
   public void deactivatePipeline(PipelineID pipelineID)
       throws IOException, TimeoutException {
+    HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
     acquireWriteLock();
     try {
-      stateManager.updatePipelineState(pipelineID.getProtobuf(),
+      stateManager.updatePipelineState(pipelineIDProtobuf,
           HddsProtos.PipelineState.PIPELINE_DORMANT);
     } finally {
       releaseWriteLock();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index af3c7b2c41..a697e2e293 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -56,8 +56,8 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
   private @Metric MutableCounterLong numPipelineReportProcessed;
   private @Metric MutableCounterLong numPipelineReportProcessingFailed;
   private @Metric MutableCounterLong numPipelineContainSameDatanodes;
-  private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
-  private Map<PipelineID, MutableCounterLong> numBytesWritten;
+  private final Map<PipelineID, MutableCounterLong> numBlocksAllocated;
+  private final Map<PipelineID, MutableCounterLong> numBytesWritten;
 
   /** Private constructor. */
   private SCMPipelineMetrics() {
@@ -84,7 +84,7 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
   /**
    * Unregister the metrics instance.
    */
-  public static void unRegister() {
+  public static synchronized void unRegister() {
     instance = null;
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);
@@ -112,9 +112,7 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
     numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
         .info(getBlockAllocationMetricName(pipeline),
             "Number of blocks allocated in pipeline " + pipeline.getId()), 
0L));
-    numBytesWritten.put(pipeline.getId(), new MutableCounterLong(Interns
-        .info(getBytesWrittenMetricName(pipeline),
-            "Number of bytes written into pipeline " + pipeline.getId()), 0L));
+    numBytesWritten.put(pipeline.getId(), bytesWrittenCounter(pipeline, 0L));
   }
 
   public static String getBlockAllocationMetricName(Pipeline pipeline) {
@@ -159,9 +157,16 @@ public final class SCMPipelineMetrics implements 
MetricsSource {
    * Increments the number of total bytes that write into the pipeline.
    */
   void incNumPipelineBytesWritten(Pipeline pipeline, long bytes) {
-    numBytesWritten.put(pipeline.getId(), new MutableCounterLong(
-        Interns.info(getBytesWrittenMetricName(pipeline), "Number of" +
-            " bytes written into pipeline " + pipeline.getId()), bytes));
+    numBytesWritten.computeIfPresent(pipeline.getId(),
+        (k, v) -> bytesWrittenCounter(pipeline, bytes));
+  }
+
+  private static MutableCounterLong bytesWrittenCounter(
+      Pipeline pipeline, long bytes) {
+    return new MutableCounterLong(
+        Interns.info(getBytesWrittenMetricName(pipeline),
+            "Number of bytes written into pipeline " + pipeline.getId()),
+        bytes);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to