This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 91eb342 HDDS-4560. Add ReadWriteLock into PipelineStateManagerV2Impl
to protect contentions between RaftServer and PipelineManager. (#1676)
91eb342 is described below
commit 91eb342c0509ae54283d6a2ad2e2638380812b34
Author: GlenGeng <[email protected]>
AuthorDate: Mon Dec 14 01:06:16 2020 +0800
HDDS-4560. Add ReadWriteLock into PipelineStateManagerV2Impl to protect
contentions between RaftServer and PipelineManager. (#1676)
---
.../scm/pipeline/PipelineStateManagerV2Impl.java | 129 ++++++++++++++++-----
1 file changed, 102 insertions(+), 27 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 703cdec..8f5ebc4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -35,6 +35,8 @@ import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Implementation of pipeline state manager.
@@ -49,7 +51,11 @@ public class PipelineStateManagerV2Impl implements
StateManager {
private final PipelineStateMap pipelineStateMap;
private final NodeManager nodeManager;
- private Table<PipelineID, Pipeline> pipelineStore;
+ private final Table<PipelineID, Pipeline> pipelineStore;
+
+ // Protect potential contentions between RaftServer and PipelineManager.
+ // See https://issues.apache.org/jira/browse/HDDS-4560
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
public PipelineStateManagerV2Impl(
Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
@@ -79,47 +85,82 @@ public class PipelineStateManagerV2Impl implements
StateManager {
@Override
public void addPipeline(HddsProtos.Pipeline pipelineProto)
throws IOException {
- Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
- pipelineStore.put(pipeline.getId(), pipeline);
- pipelineStateMap.addPipeline(pipeline);
- nodeManager.addPipeline(pipeline);
- LOG.info("Created pipeline {}.", pipeline);
+ lock.writeLock().lock();
+ try {
+ Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
+ pipelineStore.put(pipeline.getId(), pipeline);
+ pipelineStateMap.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ LOG.info("Created pipeline {}.", pipeline);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void addContainerToPipeline(
PipelineID pipelineId, ContainerID containerID)
throws IOException {
- pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+ lock.writeLock().lock();
+ try {
+ pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
- return pipelineStateMap.getPipeline(pipelineID);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipeline(pipelineID);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public List<Pipeline> getPipelines() {
- return pipelineStateMap.getPipelines();
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelines();
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
- return pipelineStateMap.getPipelines(type);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelines(type);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public List<Pipeline> getPipelines(
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
- return pipelineStateMap.getPipelines(type, factor);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelines(type, factor);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public List<Pipeline> getPipelines(
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
Pipeline.PipelineState state) {
- return pipelineStateMap.getPipelines(type, factor, state);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelines(type, factor, state);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
@@ -127,52 +168,86 @@ public class PipelineStateManagerV2Impl implements
StateManager {
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
- return pipelineStateMap
- .getPipelines(type, factor, state, excludeDns, excludePipelines);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap
+ .getPipelines(type, factor, state, excludeDns, excludePipelines);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
Pipeline.PipelineState... states) {
- return pipelineStateMap.getPipelines(type, states);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelines(type, states);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
- return pipelineStateMap.getContainers(pipelineID);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getContainers(pipelineID);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
- return pipelineStateMap.getNumberOfContainers(pipelineID);
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getNumberOfContainers(pipelineID);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
throws IOException {
- PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
- pipelineStore.delete(pipelineID);
- Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
- nodeManager.removePipeline(pipeline);
- LOG.info("Pipeline {} removed.", pipeline);
- return;
+ lock.writeLock().lock();
+ try {
+ PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+ pipelineStore.delete(pipelineID);
+ Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
+ nodeManager.removePipeline(pipeline);
+ LOG.info("Pipeline {} removed.", pipeline);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
- pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+ lock.writeLock().lock();
+ try {
+ pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void updatePipelineState(
HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
throws IOException {
- pipelineStateMap.updatePipelineState(
- PipelineID.getFromProtobuf(pipelineIDProto),
- Pipeline.PipelineState.fromProtobuf(newState));
+ lock.writeLock().lock();
+ try {
+ pipelineStateMap.updatePipelineState(
+ PipelineID.getFromProtobuf(pipelineIDProto),
+ Pipeline.PipelineState.fromProtobuf(newState));
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]