This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 869654957d HDDS-8905. PipelineManager metrics should not be
synchronized (#4959)
869654957d is described below
commit 869654957d54a258d6bc37dde92749e79c191d18
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jun 23 12:15:52 2023 +0200
HDDS-8905. PipelineManager metrics should not be synchronized (#4959)
---
.../hdds/scm/pipeline/PipelineManagerImpl.java | 70 +++++++++++++---------
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 23 ++++---
2 files changed, 57 insertions(+), 36 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 33c587995a..cbfe9a6a07 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -221,13 +221,12 @@ public class PipelineManagerImpl implements
PipelineManager {
}
acquireWriteLock();
+ final Pipeline pipeline;
try {
- Pipeline pipeline = pipelineFactory.create(replicationConfig,
+ pipeline = pipelineFactory.create(replicationConfig,
excludedNodes, favoredNodes);
stateManager.addPipeline(pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION));
- recordMetricsForPipeline(pipeline);
- return pipeline;
} catch (IOException | TimeoutException ex) {
LOG.debug("Failed to create pipeline with replicationConfig {}.",
replicationConfig, ex);
@@ -236,6 +235,9 @@ public class PipelineManagerImpl implements PipelineManager
{
} finally {
releaseWriteLock();
}
+
+ recordMetricsForPipeline(pipeline);
+ return pipeline;
}
private boolean factorOne(ReplicationConfig replicationConfig) {
@@ -309,14 +311,14 @@ public class PipelineManagerImpl implements
PipelineManager {
.getPipelines(replicationConfig, state, excludeDns, excludePipelines);
}
- @Override
/**
* Returns the count of pipelines meeting the given ReplicationConfig and
* state.
- * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param config The ReplicationConfig of the pipelines to count
* @param state The current state of the pipelines to count
* @return The count of pipelines meeting the above criteria
*/
+ @Override
public int getPipelineCount(ReplicationConfig config,
Pipeline.PipelineState state) {
return stateManager.getPipelineCount(config, state);
@@ -357,22 +359,29 @@ public class PipelineManagerImpl implements
PipelineManager {
@Override
public void openPipeline(PipelineID pipelineId)
throws IOException, TimeoutException {
+ HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
acquireWriteLock();
+ final Pipeline pipeline;
+ boolean opened = false;
try {
- Pipeline pipeline = stateManager.getPipeline(pipelineId);
+ pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
- LOG.info("Pipeline {} moved to OPEN state", pipeline);
- stateManager.updatePipelineState(
- pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN);
+ stateManager.updatePipelineState(pipelineIdProtobuf,
+ HddsProtos.PipelineState.PIPELINE_OPEN);
+ opened = true;
}
- metrics.incNumPipelineCreated();
- metrics.createPerPipelineMetrics(pipeline);
} finally {
releaseWriteLock();
}
+
+ if (opened) {
+ LOG.info("Pipeline {} moved to OPEN state", pipeline);
+ }
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
}
/**
@@ -384,17 +393,18 @@ public class PipelineManagerImpl implements
PipelineManager {
protected void removePipeline(Pipeline pipeline)
throws IOException, TimeoutException {
pipelineFactory.close(pipeline.getType(), pipeline);
- PipelineID pipelineID = pipeline.getId();
+ HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
acquireWriteLock();
try {
- stateManager.removePipeline(pipelineID.getProtobuf());
- metrics.incNumPipelineDestroyed();
+ stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
releaseWriteLock();
}
+
+ metrics.incNumPipelineDestroyed();
}
/**
@@ -402,7 +412,7 @@ public class PipelineManagerImpl implements PipelineManager
{
* @param pipelineId - ID of the pipeline.
* @throws IOException
*/
- protected void closeContainersForPipeline(final PipelineID pipelineId)
+ private void closeContainersForPipeline(final PipelineID pipelineId)
throws IOException, TimeoutException {
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
ContainerManager containerManager = scmContext.getScm()
@@ -432,19 +442,23 @@ public class PipelineManagerImpl implements
PipelineManager {
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException, TimeoutException {
PipelineID pipelineID = pipeline.getId();
+ HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
- acquireWriteLock();
- try {
- if (!pipeline.isClosed()) {
- stateManager.updatePipelineState(pipelineID.getProtobuf(),
+
+ if (!pipeline.isClosed()) {
+ acquireWriteLock();
+ try {
+ stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
- LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+ } finally {
+ releaseWriteLock();
}
- metrics.removePipelineMetrics(pipelineID);
- } finally {
- releaseWriteLock();
+ LOG.info("Pipeline {} moved to CLOSED state", pipeline);
}
+
+ metrics.removePipelineMetrics(pipelineID);
+
if (!onTimeout) {
// close pipeline right away.
removePipeline(pipeline);
@@ -497,7 +511,7 @@ public class PipelineManagerImpl implements PipelineManager
{
@Override
public void scrubPipelines() throws IOException, TimeoutException {
Instant currentTime = clock.instant();
- Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+ long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
@@ -595,10 +609,11 @@ public class PipelineManagerImpl implements
PipelineManager {
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException, TimeoutException {
+ HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
try {
- stateManager.updatePipelineState(pipelineID.getProtobuf(),
- HddsProtos.PipelineState.PIPELINE_OPEN);
+ stateManager.updatePipelineState(pipelineIDProtobuf,
+ HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
releaseWriteLock();
}
@@ -613,9 +628,10 @@ public class PipelineManagerImpl implements
PipelineManager {
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException, TimeoutException {
+ HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
try {
- stateManager.updatePipelineState(pipelineID.getProtobuf(),
+ stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
releaseWriteLock();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index af3c7b2c41..a697e2e293 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -56,8 +56,8 @@ public final class SCMPipelineMetrics implements
MetricsSource {
private @Metric MutableCounterLong numPipelineReportProcessed;
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
private @Metric MutableCounterLong numPipelineContainSameDatanodes;
- private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
- private Map<PipelineID, MutableCounterLong> numBytesWritten;
+ private final Map<PipelineID, MutableCounterLong> numBlocksAllocated;
+ private final Map<PipelineID, MutableCounterLong> numBytesWritten;
/** Private constructor. */
private SCMPipelineMetrics() {
@@ -84,7 +84,7 @@ public final class SCMPipelineMetrics implements
MetricsSource {
/**
* Unregister the metrics instance.
*/
- public static void unRegister() {
+ public static synchronized void unRegister() {
instance = null;
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
@@ -112,9 +112,7 @@ public final class SCMPipelineMetrics implements
MetricsSource {
numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
.info(getBlockAllocationMetricName(pipeline),
"Number of blocks allocated in pipeline " + pipeline.getId()),
0L));
- numBytesWritten.put(pipeline.getId(), new MutableCounterLong(Interns
- .info(getBytesWrittenMetricName(pipeline),
- "Number of bytes written into pipeline " + pipeline.getId()), 0L));
+ numBytesWritten.put(pipeline.getId(), bytesWrittenCounter(pipeline, 0L));
}
public static String getBlockAllocationMetricName(Pipeline pipeline) {
@@ -159,9 +157,16 @@ public final class SCMPipelineMetrics implements
MetricsSource {
* Increments the number of total bytes that write into the pipeline.
*/
void incNumPipelineBytesWritten(Pipeline pipeline, long bytes) {
- numBytesWritten.put(pipeline.getId(), new MutableCounterLong(
- Interns.info(getBytesWrittenMetricName(pipeline), "Number of" +
- " bytes written into pipeline " + pipeline.getId()), bytes));
+ numBytesWritten.computeIfPresent(pipeline.getId(),
+ (k, v) -> bytesWrittenCounter(pipeline, bytes));
+ }
+
+ private static MutableCounterLong bytesWrittenCounter(
+ Pipeline pipeline, long bytes) {
+ return new MutableCounterLong(
+ Interns.info(getBytesWrittenMetricName(pipeline),
+ "Number of bytes written into pipeline " + pipeline.getId()),
+ bytes);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]