This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bf6f323109 HDDS-11413. PipelineManagerImpl lock optimization reduces
AllocateBlock latency (#7160)
bf6f323109 is described below
commit bf6f323109dc48c7017aacd5d5bfcd4f52991b04
Author: hao guo <[email protected]>
AuthorDate: Fri Dec 13 08:36:33 2024 +0800
HDDS-11413. PipelineManagerImpl lock optimization reduces AllocateBlock
latency (#7160)
---
.../hdds/scm/container/ContainerManagerImpl.java | 8 +--
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 8 +--
.../hdds/scm/pipeline/PipelineManagerImpl.java | 81 +++++++++++++++-------
.../pipeline/WritableRatisContainerProvider.java | 4 +-
.../hdds/scm/pipeline/MockPipelineManager.java | 8 +--
.../ozone/recon/scm/ReconPipelineManager.java | 65 +++++++++++------
6 files changed, 111 insertions(+), 63 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index d61f9ee366..b48e52dafe 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -182,7 +182,7 @@ public class ContainerManagerImpl implements
ContainerManager {
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
- pipelineManager.acquireReadLock();
+ pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
@@ -196,7 +196,7 @@ public class ContainerManagerImpl implements
ContainerManager {
}
} finally {
lock.unlock();
- pipelineManager.releaseReadLock();
+ pipelineManager.releaseReadLock(replicationConfig);
}
if (pipelines.isEmpty()) {
@@ -209,7 +209,7 @@ public class ContainerManagerImpl implements
ContainerManager {
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
- pipelineManager.acquireReadLock();
+ pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
try {
pipelines = pipelineManager
@@ -224,7 +224,7 @@ public class ContainerManagerImpl implements
ContainerManager {
}
} finally {
lock.unlock();
- pipelineManager.releaseReadLock();
+ pipelineManager.releaseReadLock(replicationConfig);
}
}
return containerInfo;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 15b0f408c5..77353adc7b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -207,20 +207,20 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
/**
* Acquire read lock.
*/
- void acquireReadLock();
+ void acquireReadLock(ReplicationConfig replicationConfig);
/**
* Release read lock.
*/
- void releaseReadLock();
+ void releaseReadLock(ReplicationConfig replicationConfig);
/**
* Acquire write lock.
*/
- void acquireWriteLock();
+ void acquireWriteLock(ReplicationConfig replicationConfig);
/**
* Release write lock.
*/
- void releaseWriteLock();
+ void releaseWriteLock(ReplicationConfig replicationConfig);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 000d3e7363..d3dd42cdca 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -77,6 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {
// Limit the number of on-going ratis operation to be 1.
private final ReentrantReadWriteLock lock;
+ private final ReentrantReadWriteLock ecPipelineLock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
@@ -105,6 +106,7 @@ public class PipelineManagerImpl implements PipelineManager
{
SCMContext scmContext,
Clock clock) {
this.lock = new ReentrantReadWriteLock();
+ this.ecPipelineLock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
@@ -248,7 +250,7 @@ public class PipelineManagerImpl implements PipelineManager
{
throws IOException {
checkIfPipelineCreationIsAllowed(replicationConfig);
- acquireWriteLock();
+ acquireWriteLock(replicationConfig);
final Pipeline pipeline;
try {
try {
@@ -261,7 +263,7 @@ public class PipelineManagerImpl implements PipelineManager
{
addPipelineToManager(pipeline);
return pipeline;
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
}
@@ -286,7 +288,8 @@ public class PipelineManagerImpl implements PipelineManager
{
throws IOException {
HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION);
- acquireWriteLock();
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
stateManager.addPipeline(pipelineProto);
} catch (IOException ex) {
@@ -294,7 +297,7 @@ public class PipelineManagerImpl implements PipelineManager
{
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
recordMetricsForPipeline(pipeline);
}
@@ -419,19 +422,23 @@ public class PipelineManagerImpl implements
PipelineManager {
public void openPipeline(PipelineID pipelineId)
throws IOException {
HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
- acquireWriteLock();
- final Pipeline pipeline;
+
+ final Pipeline pipeline = getPipeline(pipelineId);
+ ReplicationConfig replicationConfig = null;
try {
- pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
+ replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
stateManager.updatePipelineState(pipelineIdProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
}
} finally {
- releaseWriteLock();
+ if (replicationConfig != null) {
+ releaseWriteLock(replicationConfig);
+ }
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
@@ -447,14 +454,15 @@ public class PipelineManagerImpl implements
PipelineManager {
throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
- acquireWriteLock();
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} removed.", pipeline);
metrics.incNumPipelineDestroyed();
@@ -507,19 +515,20 @@ public class PipelineManagerImpl implements
PipelineManager {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
- if (!getPipeline(pipelineID).isClosed()) {
- acquireWriteLock();
+ Pipeline pipeline = getPipeline(pipelineID);
+ if (!pipeline.isClosed()) {
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}
metrics.removePipelineMetrics(pipelineID);
-
}
/**
@@ -684,12 +693,14 @@ public class PipelineManagerImpl implements
PipelineManager {
public void activatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
- acquireWriteLock();
+ Pipeline pipeline = getPipeline(pipelineID);
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
}
@@ -703,12 +714,14 @@ public class PipelineManagerImpl implements
PipelineManager {
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
- acquireWriteLock();
+ Pipeline pipeline = getPipeline(pipelineID);
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
}
@@ -931,22 +944,38 @@ public class PipelineManagerImpl implements
PipelineManager {
}
@Override
- public void acquireReadLock() {
- lock.readLock().lock();
+ public void acquireReadLock(ReplicationConfig replicationConfig) {
+ if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+ ecPipelineLock.readLock().lock();
+ } else {
+ lock.readLock().lock();
+ }
}
@Override
- public void releaseReadLock() {
- lock.readLock().unlock();
+ public void releaseReadLock(ReplicationConfig replicationConfig) {
+ if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+ ecPipelineLock.readLock().unlock();
+ } else {
+ lock.readLock().unlock();
+ }
}
@Override
- public void acquireWriteLock() {
- lock.writeLock().lock();
+ public void acquireWriteLock(ReplicationConfig replicationConfig) {
+ if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+ ecPipelineLock.writeLock().lock();
+ } else {
+ lock.writeLock().lock();
+ }
}
@Override
- public void releaseWriteLock() {
- lock.writeLock().unlock();
+ public void releaseWriteLock(ReplicationConfig replicationConfig) {
+ if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
+ ecPipelineLock.writeLock().unlock();
+ } else {
+ lock.writeLock().unlock();
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
index 99a58f690c..a1b2a28493 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -156,13 +156,13 @@ public class WritableRatisContainerProvider
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
- pipelineManager.acquireReadLock();
+ pipelineManager.acquireReadLock(repConfig);
try {
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
excludeList, Pipeline.PipelineState.OPEN);
return selectContainer(availablePipelines, req, owner, excludeList);
} finally {
- pipelineManager.releaseReadLock();
+ pipelineManager.releaseReadLock(repConfig);
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 6ece2ecb88..952dc1f010 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -332,22 +332,22 @@ public class MockPipelineManager implements
PipelineManager {
}
@Override
- public void acquireReadLock() {
+ public void acquireReadLock(ReplicationConfig replicationConfig) {
}
@Override
- public void releaseReadLock() {
+ public void releaseReadLock(ReplicationConfig replicationConfig) {
}
@Override
- public void acquireWriteLock() {
+ public void acquireWriteLock(ReplicationConfig replicationConfig) {
}
@Override
- public void releaseWriteLock() {
+ public void releaseWriteLock(ReplicationConfig replicationConfig) {
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 6bc54f3e6e..8e5e514f8c 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -24,6 +24,7 @@ import java.time.ZoneOffset;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -98,29 +99,46 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
*/
void initializePipelines(List<Pipeline> pipelinesFromScm)
throws IOException {
-
- acquireWriteLock();
- try {
- List<Pipeline> pipelinesInHouse = getPipelines();
- LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
- for (Pipeline pipeline : pipelinesFromScm) {
- // New pipeline got from SCM. Validate If it doesn't exist at Recon,
try adding it.
- if (addPipeline(pipeline)) {
- LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
- } else {
- LOG.warn("Pipeline {} already exists in Recon pipeline metadata.",
pipeline.getId());
- // Recon already has this pipeline. Just update state and creation
- // time.
- getStateManager().updatePipelineState(
- pipeline.getId().getProtobuf(),
- Pipeline.PipelineState.getProtobuf(pipeline.getPipelineState()));
- getPipeline(pipeline.getId()).setCreationTimestamp(
- pipeline.getCreationTimestamp());
+ HddsProtos.ReplicationType[] replicationTypes =
+ HddsProtos.ReplicationType.values();
+ for (HddsProtos.ReplicationType replicationType : replicationTypes) {
+ List<Pipeline> pipelines = pipelinesFromScm.stream().filter(
+ p -> p.getReplicationConfig().getReplicationType()
+ .equals(replicationType)).collect(Collectors.toList());
+
+ if (!pipelines.isEmpty()) {
+ ReplicationConfig replicationConfig =
+ pipelines.iterator().next().getReplicationConfig();
+
+ acquireWriteLock(replicationConfig);
+ try {
+ List<Pipeline> pipelinesInHouse = getPipelines().stream().filter(
+ p -> p.getReplicationConfig().getReplicationType()
+ .equals(replicationType)).collect(Collectors.toList());
+
+ LOG.info("Recon has {} pipelines in house.",
pipelinesInHouse.size());
+ for (Pipeline pipeline : pipelines) {
+ // New pipeline got from SCM. Validate If it doesn't exist at
Recon, try adding it.
+ if (addPipeline(pipeline)) {
+ LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
+ } else {
+ LOG.warn("Pipeline {} already exists in Recon pipeline
metadata.",
+ pipeline.getId());
+ // Recon already has this pipeline. Just update state and
creation
+ // time.
+ getStateManager().updatePipelineState(
+ pipeline.getId().getProtobuf(),
+ Pipeline.PipelineState.getProtobuf(
+ pipeline.getPipelineState()));
+ getPipeline(pipeline.getId()).setCreationTimestamp(
+ pipeline.getCreationTimestamp());
+ }
+ }
+ removeInvalidPipelines(pipelines);
+ } finally {
+ releaseWriteLock(replicationConfig);
}
}
- removeInvalidPipelines(pipelinesFromScm);
- } finally {
- releaseWriteLock();
}
}
@@ -163,7 +181,8 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
*/
@VisibleForTesting
public boolean addPipeline(Pipeline pipeline) throws IOException {
- acquireWriteLock();
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ acquireWriteLock(replicationConfig);
try {
// Check if the pipeline already exists
if (containsPipeline(pipeline.getId())) {
@@ -172,7 +191,7 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
return true;
} finally {
- releaseWriteLock();
+ releaseWriteLock(replicationConfig);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]