This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new a52e8ef HDDS-4630: Solve deadlock triggered by PipelineActionHandler.
(#1743)
a52e8ef is described below
commit a52e8efd7dcd2b3039cc6bf303cd03fdb0de6b95
Author: GlenGeng <[email protected]>
AuthorDate: Thu Jan 7 15:33:21 2021 +0800
HDDS-4630: Solve deadlock triggered by PipelineActionHandler. (#1743)
---
.../hdds/scm/container/ContainerManagerImpl.java | 117 +++-----
.../scm/container/ContainerStateManagerImpl.java | 205 ++++++++-----
.../scm/container/states/ContainerStateMap.java | 330 ++++++++-------------
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 117 ++------
4 files changed, 331 insertions(+), 438 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 5ce85f6..91684ce 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -28,8 +28,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
@@ -70,8 +70,8 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
/**
*
*/
- //Can we move this lock to ContainerStateManager?
- private final ReadWriteLock lock;
+ // Limit the number of on-going ratis operations.
+ private final Lock lock;
/**
*
@@ -102,7 +102,7 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
final Table<ContainerID, ContainerInfo> containerStore)
throws IOException {
// Introduce builder for this class?
- this.lock = new ReentrantReadWriteLock();
+ this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.haManager = scmHaManager;
this.containerStateManager = ContainerStateManagerImpl.newBuilder()
@@ -124,56 +124,41 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
@Override
public ContainerInfo getContainer(final ContainerID id)
throws ContainerNotFoundException {
- lock.readLock().lock();
- try {
- return Optional.ofNullable(containerStateManager
- .getContainer(id.getProtobuf()))
- .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
- } finally {
- lock.readLock().unlock();
- }
+ return Optional.ofNullable(containerStateManager
+ .getContainer(id.getProtobuf()))
+ .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
}
@Override
public List<ContainerInfo> getContainers(final ContainerID startID,
final int count) {
- lock.readLock().lock();
scmContainerManagerMetrics.incNumListContainersOps();
- try {
- // TODO: Remove the null check, startID should not be null. Fix the unit
- // test before removing the check.
- final long start = startID == null ? 0 : startID.getId();
- final List<ContainerID> containersIds =
- new ArrayList<>(containerStateManager.getContainerIDs());
- Collections.sort(containersIds);
- return containersIds.stream()
- .filter(id -> id.getId() > start).limit(count)
- .map(ContainerID::getProtobuf)
- .map(containerStateManager::getContainer)
- .collect(Collectors.toList());
- } finally {
- lock.readLock().unlock();
- }
+ // TODO: Remove the null check, startID should not be null. Fix the unit
+ // test before removing the check.
+ final long start = startID == null ? 0 : startID.getId();
+ final List<ContainerID> containersIds =
+ new ArrayList<>(containerStateManager.getContainerIDs());
+ Collections.sort(containersIds);
+ return containersIds.stream()
+ .filter(id -> id.getId() > start).limit(count)
+ .map(ContainerID::getProtobuf)
+ .map(containerStateManager::getContainer)
+ .collect(Collectors.toList());
}
@Override
public List<ContainerInfo> getContainers(final LifeCycleState state) {
- lock.readLock().lock();
- try {
- return containerStateManager.getContainerIDs(state).stream()
- .map(ContainerID::getProtobuf)
- .map(containerStateManager::getContainer)
- .filter(Objects::nonNull).collect(Collectors.toList());
- } finally {
- lock.readLock().unlock();
- }
+ return containerStateManager.getContainerIDs(state).stream()
+ .map(ContainerID::getProtobuf)
+ .map(containerStateManager::getContainer)
+ .filter(Objects::nonNull).collect(Collectors.toList());
}
@Override
public ContainerInfo allocateContainer(final ReplicationType type,
final ReplicationFactor replicationFactor, final String owner)
throws IOException {
- lock.writeLock().lock();
+ lock.lock();
try {
final List<Pipeline> pipelines = pipelineManager
.getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
@@ -198,7 +183,7 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
}
return containerInfo;
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -233,7 +218,7 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
final LifeCycleEvent event)
throws IOException, InvalidStateTransitionException {
final HddsProtos.ContainerID cid = id.getProtobuf();
- lock.writeLock().lock();
+ lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerState(cid, event);
@@ -241,21 +226,16 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
throwContainerNotFoundException(cid);
}
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@Override
public Set<ContainerReplica> getContainerReplicas(final ContainerID id)
throws ContainerNotFoundException {
- lock.readLock().lock();
- try {
- return Optional.ofNullable(containerStateManager
- .getContainerReplicas(id.getProtobuf()))
- .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
- } finally {
- lock.readLock().unlock();
- }
+ return Optional.ofNullable(containerStateManager
+ .getContainerReplicas(id.getProtobuf()))
+ .orElseThrow(() -> new ContainerNotFoundException("ID " + id));
}
@Override
@@ -263,15 +243,10 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
final ContainerReplica replica)
throws ContainerNotFoundException {
final HddsProtos.ContainerID cid = id.getProtobuf();
- lock.writeLock().lock();
- try {
- if (containerExist(cid)) {
- containerStateManager.updateContainerReplica(cid, replica);
- } else {
- throwContainerNotFoundException(cid);
- }
- } finally {
- lock.writeLock().unlock();
+ if (containerExist(cid)) {
+ containerStateManager.updateContainerReplica(cid, replica);
+ } else {
+ throwContainerNotFoundException(cid);
}
}
@@ -280,27 +255,17 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
final ContainerReplica replica)
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
final HddsProtos.ContainerID cid = id.getProtobuf();
- lock.writeLock().lock();
- try {
- if (containerExist(cid)) {
- containerStateManager.removeContainerReplica(cid, replica);
- } else {
- throwContainerNotFoundException(cid);
- }
- } finally {
- lock.writeLock().unlock();
+ if (containerExist(cid)) {
+ containerStateManager.removeContainerReplica(cid, replica);
+ } else {
+ throwContainerNotFoundException(cid);
}
}
@Override
public void updateDeleteTransactionId(
final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
- lock.writeLock().lock();
- try {
- containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
- } finally {
- lock.writeLock().unlock();
- }
+ containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
}
@Override
@@ -384,7 +349,7 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
public void deleteContainer(final ContainerID id)
throws IOException {
final HddsProtos.ContainerID cid = id.getProtobuf();
- lock.writeLock().lock();
+ lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.removeContainer(cid);
@@ -394,7 +359,7 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
throwContainerNotFoundException(cid);
}
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 7807ae5..68e4d35 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
@@ -97,7 +99,7 @@ public final class ContainerStateManagerImpl
/**
* Persistent store for Container States.
*/
- private Table<ContainerID, ContainerInfo> containerStore;
+ private final Table<ContainerID, ContainerInfo> containerStore;
/**
* PipelineManager instance.
@@ -117,6 +119,11 @@ public final class ContainerStateManagerImpl
private final Map<LifeCycleEvent, CheckedConsumer<ContainerInfo,
IOException>>
containerStateChangeActions;
+
+ // Protect containers and containerStore against the potential
+ // contentions between RaftServer and ContainerManager.
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
/**
* constructs ContainerStateManagerImpl instance and loads the containers
* form the persistent storage.
@@ -246,18 +253,32 @@ public final class ContainerStateManagerImpl
@Override
public Set<ContainerID> getContainerIDs() {
- return containers.getAllContainerIDs();
+ lock.readLock().lock();
+ try {
+ return containers.getAllContainerIDs();
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
- return containers.getContainerIDsByState(state);
+ lock.readLock().lock();
+ try {
+ return containers.getContainerIDsByState(state);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public ContainerInfo getContainer(final HddsProtos.ContainerID id) {
- return containers.getContainerInfo(
- ContainerID.getFromProtobuf(id));
+ lock.readLock().lock();
+ try {
+ return containers.getContainerInfo(ContainerID.getFromProtobuf(id));
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
@@ -273,45 +294,62 @@ public final class ContainerStateManagerImpl
final ContainerID containerID = container.containerID();
final PipelineID pipelineID = container.getPipelineID();
- if (!containers.contains(containerID)) {
- ExecutionUtil.create(() -> {
- containerStore.put(containerID, container);
- containers.addContainer(container);
- pipelineManager.addContainerToPipeline(pipelineID, containerID);
- }).onException(() -> {
- containers.removeContainer(containerID);
- containerStore.delete(containerID);
- }).execute();
+ lock.writeLock().lock();
+ try {
+ if (!containers.contains(containerID)) {
+ ExecutionUtil.create(() -> {
+ containerStore.put(containerID, container);
+ containers.addContainer(container);
+ pipelineManager.addContainerToPipeline(pipelineID, containerID);
+ }).onException(() -> {
+ containers.removeContainer(containerID);
+ containerStore.delete(containerID);
+ }).execute();
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
@Override
public boolean contains(final HddsProtos.ContainerID id) {
- // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
- return containers.contains(ContainerID.getFromProtobuf(id));
+ lock.readLock().lock();
+ try {
+ // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
+ return containers.contains(ContainerID.getFromProtobuf(id));
+ } finally {
+ lock.readLock().unlock();
+ }
}
+ @Override
public void updateContainerState(final HddsProtos.ContainerID containerID,
final LifeCycleEvent event)
throws IOException, InvalidStateTransitionException {
// TODO: Remove the protobuf conversion after fixing ContainerStateMap.
final ContainerID id = ContainerID.getFromProtobuf(containerID);
- if (containers.contains(id)) {
- final ContainerInfo oldInfo = containers.getContainerInfo(id);
- final LifeCycleState oldState = oldInfo.getState();
- final LifeCycleState newState = stateMachine.getNextState(
- oldInfo.getState(), event);
- if (newState.getNumber() > oldState.getNumber()) {
- ExecutionUtil.create(() -> {
- containers.updateState(id, oldState, newState);
- containerStore.put(id, containers.getContainerInfo(id));
- }).onException(() -> {
- containerStore.put(id, oldInfo);
- containers.updateState(id, newState, oldState);
- }).execute();
- containerStateChangeActions.getOrDefault(event, info -> {})
- .execute(oldInfo);
+
+ lock.writeLock().lock();
+ try {
+ if (containers.contains(id)) {
+ final ContainerInfo oldInfo = containers.getContainerInfo(id);
+ final LifeCycleState oldState = oldInfo.getState();
+ final LifeCycleState newState = stateMachine.getNextState(
+ oldInfo.getState(), event);
+ if (newState.getNumber() > oldState.getNumber()) {
+ ExecutionUtil.create(() -> {
+ containers.updateState(id, oldState, newState);
+ containerStore.put(id, containers.getContainerInfo(id));
+ }).onException(() -> {
+ containerStore.put(id, oldInfo);
+ containers.updateState(id, newState, oldState);
+ }).execute();
+ containerStateChangeActions.getOrDefault(event, info -> {})
+ .execute(oldInfo);
+ }
}
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -319,35 +357,54 @@ public final class ContainerStateManagerImpl
@Override
public Set<ContainerReplica> getContainerReplicas(
final HddsProtos.ContainerID id) {
- return containers.getContainerReplicas(
- ContainerID.getFromProtobuf(id));
+ lock.readLock().lock();
+ try {
+ return containers.getContainerReplicas(
+ ContainerID.getFromProtobuf(id));
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public void updateContainerReplica(final HddsProtos.ContainerID id,
final ContainerReplica replica) {
- containers.updateContainerReplica(ContainerID.getFromProtobuf(id),
- replica);
+ lock.writeLock().lock();
+ try {
+ containers.updateContainerReplica(ContainerID.getFromProtobuf(id),
+ replica);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void removeContainerReplica(final HddsProtos.ContainerID id,
final ContainerReplica replica) {
- containers.removeContainerReplica(ContainerID.getFromProtobuf(id),
- replica);
-
+ lock.writeLock().lock();
+ try {
+ containers.removeContainerReplica(ContainerID.getFromProtobuf(id),
+ replica);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void updateDeleteTransactionId(
final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
- // TODO: Refactor this. Error handling is not done.
- for (Map.Entry<ContainerID, Long> transaction :
- deleteTransactionMap.entrySet()) {
- final ContainerInfo info = containers.getContainerInfo(
- transaction.getKey());
- info.updateDeleteTransactionId(transaction.getValue());
- containerStore.put(info.containerID(), info);
+ lock.writeLock().lock();
+ try {
+ // TODO: Refactor this. Error handling is not done.
+ for (Map.Entry<ContainerID, Long> transaction :
+ deleteTransactionMap.entrySet()) {
+ final ContainerInfo info = containers.getContainerInfo(
+ transaction.getKey());
+ info.updateDeleteTransactionId(transaction.getValue());
+ containerStore.put(info.containerID(), info);
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -370,26 +427,31 @@ public final class ContainerStateManagerImpl
resultSet = containerIDs;
}
- ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
- if (selectedContainer == null) {
-
- // If we did not find any space in the tailSet, we need to look for
- // space in the headset, we need to pass true to deal with the
- // situation that we have a lone container that has space. That is we
- // ignored the last used container under the assumption we can find
- // other containers with space, but if have a single container that is
- // not true. Hence we need to include the last used container as the
- // last element in the sorted set.
-
- resultSet = containerIDs.headSet(lastID, true);
- selectedContainer = findContainerWithSpace(size, resultSet);
- }
+ lock.readLock().lock();
+ try {
+ ContainerInfo selectedContainer = findContainerWithSpace(size,
resultSet);
+ if (selectedContainer == null) {
+
+ // If we did not find any space in the tailSet, we need to look for
+ // space in the headset, we need to pass true to deal with the
+ // situation that we have a lone container that has space. That is we
+ // ignored the last used container under the assumption we can find
+ // other containers with space, but if have a single container that is
+ // not true. Hence we need to include the last used container as the
+ // last element in the sorted set.
+
+ resultSet = containerIDs.headSet(lastID, true);
+ selectedContainer = findContainerWithSpace(size, resultSet);
+ }
- // TODO: cleanup entries in lastUsedMap
- if (selectedContainer != null) {
- lastUsedMap.put(key, selectedContainer.containerID());
+ // TODO: cleanup entries in lastUsedMap
+ if (selectedContainer != null) {
+ lastUsedMap.put(key, selectedContainer.containerID());
+ }
+ return selectedContainer;
+ } finally {
+ lock.readLock().unlock();
}
- return selectedContainer;
}
private ContainerInfo findContainerWithSpace(final long size,
@@ -408,12 +470,17 @@ public final class ContainerStateManagerImpl
public void removeContainer(final HddsProtos.ContainerID id)
throws IOException {
- final ContainerID cid = ContainerID.getFromProtobuf(id);
- final ContainerInfo containerInfo = containers.getContainerInfo(cid);
- ExecutionUtil.create(() -> {
- containerStore.delete(cid);
- containers.removeContainer(cid);
- }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+ lock.writeLock().lock();
+ try {
+ final ContainerID cid = ContainerID.getFromProtobuf(id);
+ final ContainerInfo containerInfo = containers.getContainerInfo(cid);
+ ExecutionUtil.create(() -> {
+ containerStore.delete(cid);
+ containers.removeContainer(cid);
+ }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 4d143e0..4d7860d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -23,8 +23,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Preconditions;
@@ -92,12 +90,6 @@ public class ContainerStateMap {
private final Map<ContainerID, Set<ContainerReplica>> replicaMap;
private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache;
- // Container State Map lock should be held before calling into
- // Update ContainerAttributes. The consistency of ContainerAttributes is
- // protected by this lock.
- // Can we remove this lock?
- private final ReadWriteLock lock;
-
/**
* Create a ContainerStateMap.
*/
@@ -107,7 +99,6 @@ public class ContainerStateMap {
this.factorMap = new ContainerAttribute<>();
this.typeMap = new ContainerAttribute<>();
this.containerMap = new ConcurrentHashMap<>();
- this.lock = new ReentrantReadWriteLock();
this.replicaMap = new ConcurrentHashMap<>();
this.resultCache = new ConcurrentHashMap<>();
}
@@ -121,34 +112,24 @@ public class ContainerStateMap {
public void addContainer(final ContainerInfo info)
throws SCMException {
Preconditions.checkNotNull(info, "Container Info cannot be null");
- lock.writeLock().lock();
- try {
- final ContainerID id = info.containerID();
- if (!contains(id)) {
- containerMap.put(id, info);
- lifeCycleStateMap.insert(info.getState(), id);
- ownerMap.insert(info.getOwner(), id);
- factorMap.insert(info.getReplicationFactor(), id);
- typeMap.insert(info.getReplicationType(), id);
- replicaMap.put(id, ConcurrentHashMap.newKeySet());
-
- // Flush the cache of this container type, will be added later when
- // get container queries are executed.
- flushCache(info);
- LOG.trace("Container {} added to ContainerStateMap.", id);
- }
- } finally {
- lock.writeLock().unlock();
+ final ContainerID id = info.containerID();
+ if (!contains(id)) {
+ containerMap.put(id, info);
+ lifeCycleStateMap.insert(info.getState(), id);
+ ownerMap.insert(info.getOwner(), id);
+ factorMap.insert(info.getReplicationFactor(), id);
+ typeMap.insert(info.getReplicationType(), id);
+ replicaMap.put(id, ConcurrentHashMap.newKeySet());
+
+ // Flush the cache of this container type, will be added later when
+ // get container queries are executed.
+ flushCache(info);
+ LOG.trace("Container {} added to ContainerStateMap.", id);
}
}
public boolean contains(final ContainerID id) {
- lock.readLock().lock();
- try {
- return containerMap.containsKey(id);
- } finally {
- lock.readLock().unlock();
- }
+ return containerMap.containsKey(id);
}
/**
@@ -158,22 +139,17 @@ public class ContainerStateMap {
*/
public void removeContainer(final ContainerID id) {
Preconditions.checkNotNull(id, "ContainerID cannot be null");
- lock.writeLock().lock();
- try {
- if (contains(id)) {
- // Should we revert back to the original state if any of the below
- // remove operation fails?
- final ContainerInfo info = containerMap.remove(id);
- lifeCycleStateMap.remove(info.getState(), id);
- ownerMap.remove(info.getOwner(), id);
- factorMap.remove(info.getReplicationFactor(), id);
- typeMap.remove(info.getReplicationType(), id);
- // Flush the cache of this container type.
- flushCache(info);
- LOG.trace("Container {} removed from ContainerStateMap.", id);
- }
- } finally {
- lock.writeLock().unlock();
+ if (contains(id)) {
+ // Should we revert back to the original state if any of the below
+ // remove operation fails?
+ final ContainerInfo info = containerMap.remove(id);
+ lifeCycleStateMap.remove(info.getState(), id);
+ ownerMap.remove(info.getOwner(), id);
+ factorMap.remove(info.getReplicationFactor(), id);
+ typeMap.remove(info.getReplicationType(), id);
+ // Flush the cache of this container type.
+ flushCache(info);
+ LOG.trace("Container {} removed from ContainerStateMap.", id);
}
}
@@ -184,12 +160,7 @@ public class ContainerStateMap {
* @return container info, if found else null.
*/
public ContainerInfo getContainerInfo(final ContainerID containerID) {
- lock.readLock().lock();
- try {
- return containerMap.get(containerID);
- } finally {
- lock.readLock().unlock();
- }
+ return containerMap.get(containerID);
}
/**
@@ -202,13 +173,8 @@ public class ContainerStateMap {
public Set<ContainerReplica> getContainerReplicas(
final ContainerID containerID) {
Preconditions.checkNotNull(containerID);
- lock.readLock().lock();
- try {
- final Set<ContainerReplica> replicas = replicaMap.get(containerID);
- return replicas == null ? null : Collections.unmodifiableSet(replicas);
- } finally {
- lock.readLock().unlock();
- }
+ final Set<ContainerReplica> replicas = replicaMap.get(containerID);
+ return replicas == null ? null : Collections.unmodifiableSet(replicas);
}
/**
@@ -222,15 +188,10 @@ public class ContainerStateMap {
public void updateContainerReplica(final ContainerID containerID,
final ContainerReplica replica) {
Preconditions.checkNotNull(containerID);
- lock.writeLock().lock();
- try {
- if (contains(containerID)) {
- final Set<ContainerReplica> replicas = replicaMap.get(containerID);
- replicas.remove(replica);
- replicas.add(replica);
- }
- } finally {
- lock.writeLock().unlock();
+ if (contains(containerID)) {
+ final Set<ContainerReplica> replicas = replicaMap.get(containerID);
+ replicas.remove(replica);
+ replicas.add(replica);
}
}
@@ -245,13 +206,8 @@ public class ContainerStateMap {
final ContainerReplica replica) {
Preconditions.checkNotNull(containerID);
Preconditions.checkNotNull(replica);
- lock.writeLock().lock();
- try {
- if (contains(containerID)) {
- replicaMap.get(containerID).remove(replica);
- }
- } finally {
- lock.writeLock().unlock();
+ if (contains(containerID)) {
+ replicaMap.get(containerID).remove(replica);
}
}
@@ -262,15 +218,10 @@ public class ContainerStateMap {
public void updateContainerInfo(final ContainerInfo info) {
Preconditions.checkNotNull(info);
final ContainerID id = info.containerID();
- lock.writeLock().lock();
- try {
- if (contains(id)) {
- final ContainerInfo currentInfo = containerMap.get(id);
- flushCache(info, currentInfo);
- containerMap.put(id, info);
- }
- } finally {
- lock.writeLock().unlock();
+ if (contains(id)) {
+ final ContainerInfo currentInfo = containerMap.get(id);
+ flushCache(info, currentInfo);
+ containerMap.put(id, info);
}
}
@@ -286,66 +237,56 @@ public class ContainerStateMap {
LifeCycleState newState) throws SCMException {
Preconditions.checkNotNull(currentState);
Preconditions.checkNotNull(newState);
- lock.writeLock().lock();
+ if (!contains(containerID)) {
+ return;
+ }
+
+ // TODO: Simplify this logic.
+ final ContainerInfo currentInfo = containerMap.get(containerID);
try {
- if (!contains(containerID)) {
- return;
+ currentInfo.setState(newState);
+
+ // We are updating two places before this update is done, these can
+ // fail independently, since the code needs to handle it.
+
+ // We update the attribute map, if that fails it will throw an
+ // exception, so no issues, if we are successful, we keep track of the
+ // fact that we have updated the lifecycle state in the map, and update
+ // the container state. If this second update fails, we will attempt to
+ // roll back the earlier change we did. If the rollback fails, we can
+ // be in an inconsistent state,
+
+ lifeCycleStateMap.update(currentState, newState, containerID);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+ "{}", containerID, currentState, newState);
}
- // TODO: Simplify this logic.
- final ContainerInfo currentInfo = containerMap.get(containerID);
- try {
- currentInfo.setState(newState);
-
- // We are updating two places before this update is done, these can
- // fail independently, since the code needs to handle it.
-
- // We update the attribute map, if that fails it will throw an
- // exception, so no issues, if we are successful, we keep track of the
- // fact that we have updated the lifecycle state in the map, and update
- // the container state. If this second update fails, we will attempt to
- // roll back the earlier change we did. If the rollback fails, we can
- // be in an inconsistent state,
-
- lifeCycleStateMap.update(currentState, newState, containerID);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Updated the container {} to new state. Old = {}, new = " +
- "{}", containerID, currentState, newState);
- }
-
- // Just flush both old and new data sets from the result cache.
- flushCache(currentInfo);
- } catch (SCMException ex) {
- LOG.error("Unable to update the container state.", ex);
- // we need to revert the change in this attribute since we are not
- // able to update the hash table.
- LOG.info("Reverting the update to lifecycle state. Moving back to " +
- "old state. Old = {}, Attempted state = {}", currentState,
- newState);
-
- currentInfo.setState(currentState);
-
- // if this line throws, the state map can be in an inconsistent
- // state, since we will have modified the attribute by the
- // container state will not in sync since we were not able to put
- // that into the hash table.
- lifeCycleStateMap.update(newState, currentState, containerID);
-
- throw new SCMException("Updating the container map failed.", ex,
- FAILED_TO_CHANGE_CONTAINER_STATE);
- }
- } finally {
- lock.writeLock().unlock();
+ // Just flush both old and new data sets from the result cache.
+ flushCache(currentInfo);
+ } catch (SCMException ex) {
+ LOG.error("Unable to update the container state.", ex);
+ // we need to revert the change in this attribute since we are not
+ // able to update the hash table.
+ LOG.info("Reverting the update to lifecycle state. Moving back to " +
+ "old state. Old = {}, Attempted state = {}", currentState,
+ newState);
+
+ currentInfo.setState(currentState);
+
+ // if this line throws, the state map can be in an inconsistent
+ // state, since we will have modified the attribute by the
+ // container state will not in sync since we were not able to put
+ // that into the hash table.
+ lifeCycleStateMap.update(newState, currentState, containerID);
+
+ throw new SCMException("Updating the container map failed.", ex,
+ FAILED_TO_CHANGE_CONTAINER_STATE);
}
}
public Set<ContainerID> getAllContainerIDs() {
- lock.readLock().lock();
- try {
- return Collections.unmodifiableSet(containerMap.keySet());
- } finally {
- lock.readLock().unlock();
- }
+ return Collections.unmodifiableSet(containerMap.keySet());
}
/**
@@ -356,12 +297,7 @@ public class ContainerStateMap {
*/
NavigableSet<ContainerID> getContainerIDsByOwner(final String ownerName) {
Preconditions.checkNotNull(ownerName);
- lock.readLock().lock();
- try {
- return ownerMap.getCollection(ownerName);
- } finally {
- lock.readLock().unlock();
- }
+ return ownerMap.getCollection(ownerName);
}
/**
@@ -372,12 +308,7 @@ public class ContainerStateMap {
*/
NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) {
Preconditions.checkNotNull(type);
- lock.readLock().lock();
- try {
- return typeMap.getCollection(type);
- } finally {
- lock.readLock().unlock();
- }
+ return typeMap.getCollection(type);
}
/**
@@ -389,12 +320,7 @@ public class ContainerStateMap {
NavigableSet<ContainerID> getContainerIDsByFactor(
final ReplicationFactor factor) {
Preconditions.checkNotNull(factor);
- lock.readLock().lock();
- try {
- return factorMap.getCollection(factor);
- } finally {
- lock.readLock().unlock();
- }
+ return factorMap.getCollection(factor);
}
/**
@@ -406,12 +332,7 @@ public class ContainerStateMap {
public NavigableSet<ContainerID> getContainerIDsByState(
final LifeCycleState state) {
Preconditions.checkNotNull(state);
- lock.readLock().lock();
- try {
- return lifeCycleStateMap.getCollection(state);
- } finally {
- lock.readLock().unlock();
- }
+ return lifeCycleStateMap.getCollection(state);
}
/**
@@ -432,57 +353,52 @@ public class ContainerStateMap {
Preconditions.checkNotNull(factor, "Factor cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
- lock.readLock().lock();
- try {
- final ContainerQueryKey queryKey =
- new ContainerQueryKey(state, owner, factor, type);
- if(resultCache.containsKey(queryKey)){
- return resultCache.get(queryKey);
- }
+ final ContainerQueryKey queryKey =
+ new ContainerQueryKey(state, owner, factor, type);
+ if(resultCache.containsKey(queryKey)){
+ return resultCache.get(queryKey);
+ }
- // If we cannot meet any one condition we return EMPTY_SET immediately.
- // Since when we intersect these sets, the result will be empty if any
- // one is empty.
- final NavigableSet<ContainerID> stateSet =
- lifeCycleStateMap.getCollection(state);
- if (stateSet.size() == 0) {
- return EMPTY_SET;
- }
+ // If we cannot meet any one condition we return EMPTY_SET immediately.
+ // Since when we intersect these sets, the result will be empty if any
+ // one is empty.
+ final NavigableSet<ContainerID> stateSet =
+ lifeCycleStateMap.getCollection(state);
+ if (stateSet.size() == 0) {
+ return EMPTY_SET;
+ }
- final NavigableSet<ContainerID> ownerSet =
- ownerMap.getCollection(owner);
- if (ownerSet.size() == 0) {
- return EMPTY_SET;
- }
+ final NavigableSet<ContainerID> ownerSet =
+ ownerMap.getCollection(owner);
+ if (ownerSet.size() == 0) {
+ return EMPTY_SET;
+ }
- final NavigableSet<ContainerID> factorSet =
- factorMap.getCollection(factor);
- if (factorSet.size() == 0) {
- return EMPTY_SET;
- }
+ final NavigableSet<ContainerID> factorSet =
+ factorMap.getCollection(factor);
+ if (factorSet.size() == 0) {
+ return EMPTY_SET;
+ }
- final NavigableSet<ContainerID> typeSet =
- typeMap.getCollection(type);
- if (typeSet.size() == 0) {
- return EMPTY_SET;
- }
+ final NavigableSet<ContainerID> typeSet =
+ typeMap.getCollection(type);
+ if (typeSet.size() == 0) {
+ return EMPTY_SET;
+ }
- // if we add more constraints we will just add those sets here..
- final NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
- ownerSet, factorSet, typeSet);
+ // if we add more constraints we will just add those sets here..
+ final NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+ ownerSet, factorSet, typeSet);
- NavigableSet<ContainerID> currentSet = sets[0];
- // We take the smallest set and intersect against the larger sets. This
- // allows us to reduce the lookups to the least possible number.
- for (int x = 1; x < sets.length; x++) {
- currentSet = intersectSets(currentSet, sets[x]);
- }
- resultCache.put(queryKey, currentSet);
- return currentSet;
- } finally {
- lock.readLock().unlock();
+ NavigableSet<ContainerID> currentSet = sets[0];
+ // We take the smallest set and intersect against the larger sets. This
+ // allows us to reduce the lookups to the least possible number.
+ for (int x = 1; x < sets.length; x++) {
+ currentSet = intersectSets(currentSet, sets[x]);
}
+ resultCache.put(queryKey, currentSet);
+ return currentSet;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 8b7d849..7d6c88a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -52,8 +52,8 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* SCM Pipeline Manager implementation.
@@ -64,7 +64,8 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
- private final ReadWriteLock lock;
+ // Limit the number of on-going ratis operation to be 1.
+ private final Lock lock;
private PipelineFactory pipelineFactory;
private StateManager stateManager;
private Scheduler scheduler;
@@ -88,7 +89,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
StateManager pipelineStateManager,
PipelineFactory pipelineFactory,
EventPublisher eventPublisher) {
- this.lock = new ReentrantReadWriteLock();
+ this.lock = new ReentrantLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
@@ -149,7 +150,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
throw new IOException("Pipeline creation is not allowed as safe mode " +
"prechecks have not yet passed");
}
- lock.writeLock().lock();
+ lock.lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
stateManager.addPipeline(pipeline.getProtobufMessage());
@@ -161,7 +162,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -170,90 +171,52 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
- lock.writeLock().lock();
- try {
- return pipelineFactory.create(type, factor, nodes);
- } finally {
- lock.writeLock().unlock();
- }
+ return pipelineFactory.create(type, factor, nodes);
}
@Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
- lock.readLock().lock();
- try {
- return stateManager.getPipeline(pipelineID);
- } finally {
- lock.readLock().unlock();
- }
+ return stateManager.getPipeline(pipelineID);
}
@Override
public boolean containsPipeline(PipelineID pipelineID) {
- lock.readLock().lock();
try {
getPipeline(pipelineID);
return true;
} catch (PipelineNotFoundException e) {
return false;
- } finally {
- lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines() {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines();
- } finally {
- lock.readLock().unlock();
- }
+ return stateManager.getPipelines();
}
@Override
public List<Pipeline> getPipelines(ReplicationType type) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(type);
- } finally {
- lock.readLock().unlock();
- }
+ return stateManager.getPipelines(type);
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
- ReplicationFactor factor) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(type, factor);
- } finally {
- lock.readLock().unlock();
- }
+ ReplicationFactor factor) {
+ return stateManager.getPipelines(type, factor);
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
- Pipeline.PipelineState state) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(type, state);
- } finally {
- lock.readLock().unlock();
- }
+ Pipeline.PipelineState state) {
+ return stateManager.getPipelines(type, state);
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
- ReplicationFactor factor,
- Pipeline.PipelineState state) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(type, factor, state);
- } finally {
- lock.readLock().unlock();
- }
+ ReplicationFactor factor,
+ Pipeline.PipelineState state) {
+ return stateManager.getPipelines(type, factor, state);
}
@Override
@@ -261,46 +224,28 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
ReplicationType type, ReplicationFactor factor,
Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
- lock.readLock().lock();
- try {
- return stateManager
- .getPipelines(type, factor, state, excludeDns, excludePipelines);
- } finally {
- lock.readLock().unlock();
- }
+ return stateManager
+ .getPipelines(type, factor, state, excludeDns, excludePipelines);
}
@Override
public void addContainerToPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
- lock.writeLock().lock();
- try {
- stateManager.addContainerToPipeline(pipelineID, containerID);
- } finally {
- lock.writeLock().unlock();
- }
+ // should not lock here, since no ratis operation happens.
+ stateManager.addContainerToPipeline(pipelineID, containerID);
}
@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
- lock.writeLock().lock();
- try {
- stateManager.removeContainerFromPipeline(pipelineID, containerID);
- } finally {
- lock.writeLock().unlock();
- }
+ // should not lock here, since no ratis operation happens.
+ stateManager.removeContainerFromPipeline(pipelineID, containerID);
}
@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
- lock.readLock().lock();
- try {
- return stateManager.getContainers(pipelineID);
- } finally {
- lock.readLock().unlock();
- }
+ return stateManager.getContainers(pipelineID);
}
@Override
@@ -310,7 +255,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
@Override
public void openPipeline(PipelineID pipelineId) throws IOException {
- lock.writeLock().lock();
+ lock.lock();
try {
Pipeline pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
@@ -324,7 +269,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -337,7 +282,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
protected void removePipeline(Pipeline pipeline) throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
PipelineID pipelineID = pipeline.getId();
- lock.writeLock().lock();
+ lock.lock();
try {
stateManager.removePipeline(pipelineID.getProtobuf());
metrics.incNumPipelineDestroyed();
@@ -345,7 +290,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -372,7 +317,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
PipelineID pipelineID = pipeline.getId();
- lock.writeLock().lock();
+ lock.lock();
try {
if (!pipeline.isClosed()) {
stateManager.updatePipelineState(pipelineID.getProtobuf(),
@@ -381,7 +326,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
}
metrics.removePipelineMetrics(pipelineID);
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
// close containers.
closeContainersForPipeline(pipelineID);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]