This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ff1e4143ad HDDS-10430. Race condition around Pipeline#nodesInOrder
(#6316)
ff1e4143ad is described below
commit ff1e4143ad22ae56a9476c180bb56cdb3556c487
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Mar 5 09:50:50 2024 +0100
HDDS-10430. Race condition around Pipeline#nodesInOrder (#6316)
---
.../container/common/helpers/AllocatedBlock.java | 14 ++-
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 120 ++++++++++-----------
.../hdds/scm/server/SCMBlockProtocolServer.java | 9 +-
.../hadoop/hdds/scm/TestXceiverClientGrpc.java | 2 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 4 +-
5 files changed, 79 insertions(+), 70 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
index 7ac0401af1..5a1d8f90ea 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
* contains a Pipeline and the key.
*/
public final class AllocatedBlock {
- private Pipeline pipeline;
- private ContainerBlockID containerBlockID;
+ private final Pipeline pipeline;
+ private final ContainerBlockID containerBlockID;
/**
* Builder for AllocatedBlock.
@@ -63,4 +63,14 @@ public final class AllocatedBlock {
public ContainerBlockID getBlockID() {
return containerBlockID;
}
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public Builder toBuilder() {
+ return new Builder()
+ .setContainerBlockID(containerBlockID)
+ .setPipeline(pipeline);
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 9d95cee483..05d83a8b8b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -34,6 +34,8 @@ import java.util.Set;
import java.util.UUID;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -76,10 +78,10 @@ public final class Pipeline {
private final ReplicationConfig replicationConfig;
private final PipelineState state;
- private Map<DatanodeDetails, Long> nodeStatus;
- private Map<DatanodeDetails, Integer> replicaIndexes;
+ private final Map<DatanodeDetails, Long> nodeStatus;
+ private final Map<DatanodeDetails, Integer> replicaIndexes;
// nodes with ordered distance to client
- private List<DatanodeDetails> nodesInOrder = new ArrayList<>();
+ private final ImmutableList<DatanodeDetails> nodesInOrder;
// Current reported Leader for the pipeline
private UUID leaderId;
// Timestamp for pipeline upon creation
@@ -103,17 +105,17 @@ public final class Pipeline {
* set to <i>Instant.now</i> when you crate the Pipeline object as part of
* state change.
*/
- private Pipeline(PipelineID id,
- ReplicationConfig replicationConfig, PipelineState state,
- Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) {
- this.id = id;
- this.replicationConfig = replicationConfig;
- this.state = state;
- this.nodeStatus = nodeStatus;
- this.creationTimestamp = Instant.now();
- this.suggestedLeaderId = suggestedLeaderId;
- this.replicaIndexes = new HashMap<>();
- this.stateEnterTime = Instant.now();
+ private Pipeline(Builder b) {
+ id = b.id;
+ replicationConfig = b.replicationConfig;
+ state = b.state;
+ leaderId = b.leaderId;
+ suggestedLeaderId = b.suggestedLeaderId;
+ nodeStatus = b.nodeStatus;
+ nodesInOrder = b.nodesInOrder != null ?
ImmutableList.copyOf(b.nodesInOrder) : ImmutableList.of();
+ replicaIndexes = b.replicaIndexes != null ?
ImmutableMap.copyOf(b.replicaIndexes) : ImmutableMap.of();
+ creationTimestamp = b.creationTimestamp != null ? b.creationTimestamp :
Instant.now();
+ stateEnterTime = Instant.now();
}
/**
@@ -310,19 +312,6 @@ public final class Pipeline {
return state == PipelineState.OPEN;
}
- public boolean isAllocationTimeout() {
- //TODO: define a system property to control the timeout value
- return false;
- }
-
- public void setNodesInOrder(List<DatanodeDetails> nodes) {
- nodesInOrder.clear();
- if (null == nodes) {
- return;
- }
- nodesInOrder.addAll(nodes);
- }
-
public List<DatanodeDetails> getNodesInOrder() {
if (nodesInOrder.isEmpty()) {
LOG.debug("Nodes in order is empty, delegate to getNodes");
@@ -406,33 +395,39 @@ public final class Pipeline {
// To save the message size on wire, only transfer the node order based on
// network topology
- List<DatanodeDetails> nodes = nodesInOrder;
- if (!nodes.isEmpty()) {
- for (int i = 0; i < nodes.size(); i++) {
+ if (!nodesInOrder.isEmpty()) {
+ for (int i = 0; i < nodesInOrder.size(); i++) {
Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
for (int j = 0; j < nodeStatus.keySet().size(); j++) {
- if (it.next().equals(nodes.get(i))) {
+ if (it.next().equals(nodesInOrder.get(i))) {
builder.addMemberOrders(j);
break;
}
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Serialize pipeline {} with nodesInOrder {}", id, nodes);
+ LOG.debug("Serialize pipeline {} with nodesInOrder {}", id,
nodesInOrder);
}
}
return builder.build();
}
- static Pipeline getFromProtobufSetCreationTimestamp(
+ private static Pipeline getFromProtobufSetCreationTimestamp(
HddsProtos.Pipeline proto) throws UnknownPipelineStateException {
- final Pipeline pipeline = getFromProtobuf(proto);
- // When SCM is restarted, set Creation time with current time.
- pipeline.setCreationTimestamp(Instant.now());
- return pipeline;
+ return toBuilder(proto)
+ .setCreateTimestamp(Instant.now())
+ .build();
}
- public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
+ public Pipeline copyWithNodesInOrder(List<DatanodeDetails> nodes) {
+ return toBuilder().setNodesInOrder(nodes).build();
+ }
+
+ public Builder toBuilder() {
+ return newBuilder(this);
+ }
+
+ public static Builder toBuilder(HddsProtos.Pipeline pipeline)
throws UnknownPipelineStateException {
Preconditions.checkNotNull(pipeline, "Pipeline is null");
@@ -473,9 +468,13 @@ public final class Pipeline {
.setReplicaIndexes(nodes)
.setLeaderId(leaderId)
.setSuggestedLeaderId(suggestedLeaderId)
- .setNodesInOrder(pipeline.getMemberOrdersList())
- .setCreateTimestamp(pipeline.getCreationTimeStamp())
- .build();
+ .setNodeOrder(pipeline.getMemberOrdersList())
+ .setCreateTimestamp(pipeline.getCreationTimeStamp());
+ }
+
+ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
+ throws UnknownPipelineStateException {
+ return toBuilder(pipeline).build();
}
@Override
@@ -529,10 +528,6 @@ public final class Pipeline {
return new Builder(pipeline);
}
- private void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes)
{
- this.replicaIndexes = replicaIndexes;
- }
-
/**
* Builder class for Pipeline.
*/
@@ -546,7 +541,7 @@ public final class Pipeline {
private UUID leaderId = null;
private Instant creationTimestamp = null;
private UUID suggestedLeaderId = null;
- private Map<DatanodeDetails, Integer> replicaIndexes = new HashMap<>();
+ private Map<DatanodeDetails, Integer> replicaIndexes;
public Builder() { }
@@ -559,8 +554,8 @@ public final class Pipeline {
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
this.suggestedLeaderId = pipeline.getSuggestedLeaderId();
- this.replicaIndexes = new HashMap<>();
if (nodeStatus != null) {
+ replicaIndexes = new HashMap<>();
for (DatanodeDetails dn : nodeStatus.keySet()) {
int index = pipeline.getReplicaIndex(dn);
if (index > 0) {
@@ -601,11 +596,22 @@ public final class Pipeline {
return this;
}
- public Builder setNodesInOrder(List<Integer> orders) {
+ public Builder setNodeOrder(List<Integer> orders) {
+ // for build from ProtoBuf
this.nodeOrder = orders;
return this;
}
+ public Builder setNodesInOrder(List<DatanodeDetails> nodes) {
+ this.nodesInOrder = new LinkedList<>(nodes);
+ return this;
+ }
+
+ public Builder setCreateTimestamp(Instant instant) {
+ this.creationTimestamp = instant;
+ return this;
+ }
+
public Builder setCreateTimestamp(long createTimestamp) {
this.creationTimestamp = Instant.ofEpochMilli(createTimestamp);
return this;
@@ -627,19 +633,8 @@ public final class Pipeline {
Preconditions.checkNotNull(replicationConfig);
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodeStatus);
- Pipeline pipeline =
- new Pipeline(id, replicationConfig, state, nodeStatus,
- suggestedLeaderId);
- pipeline.setLeaderId(leaderId);
- // overwrite with original creationTimestamp
- if (creationTimestamp != null) {
- pipeline.setCreationTimestamp(creationTimestamp);
- }
-
- pipeline.setReplicaIndexes(replicaIndexes);
if (nodeOrder != null && !nodeOrder.isEmpty()) {
- // This branch is for build from ProtoBuf
List<DatanodeDetails> nodesWithOrder = new ArrayList<>();
for (int i = 0; i < nodeOrder.size(); i++) {
int nodeIndex = nodeOrder.get(i);
@@ -657,13 +652,10 @@ public final class Pipeline {
LOG.debug("Deserialize nodesInOrder {} in pipeline {}",
nodesWithOrder, id);
}
- pipeline.setNodesInOrder(nodesWithOrder);
- } else if (nodesInOrder != null) {
- // This branch is for pipeline clone
- pipeline.setNodesInOrder(nodesInOrder);
+ nodesInOrder = nodesWithOrder;
}
- return pipeline;
+ return new Pipeline(this);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 69f190c7fb..0747f04584 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
@@ -203,15 +204,19 @@ public class SCMBlockProtocolServer implements
AllocatedBlock block = scm.getScmBlockManager()
.allocateBlock(size, replicationConfig, owner, excludeList);
if (block != null) {
- blocks.add(block);
// Sort the datanodes if client machine is specified
final Node client = getClientNode(clientMachine);
if (client != null) {
final List<DatanodeDetails> nodes = block.getPipeline().getNodes();
final List<DatanodeDetails> sorted = scm.getClusterMap()
.sortByDistanceCost(client, nodes, nodes.size());
- block.getPipeline().setNodesInOrder(sorted);
+ if (!Objects.equals(sorted,
block.getPipeline().getNodesInOrder())) {
+ block = block.toBuilder()
+
.setPipeline(block.getPipeline().copyWithNodesInOrder(sorted))
+ .build();
+ }
}
+ blocks.add(block);
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
index fb312dfb50..79c937ceb5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
@@ -71,8 +71,8 @@ public class TestXceiverClientGrpc {
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
.setState(Pipeline.PipelineState.CLOSED)
.setNodes(dns)
+ .setNodesInOrder(dnsInOrder)
.build();
- pipeline.setNodesInOrder(dnsInOrder);
}
@Test
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 3786601dd6..af6b41b610 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1880,7 +1880,9 @@ public class KeyManagerImpl implements KeyManager {
LOG.debug("Found sorted datanodes for pipeline {} and client {} "
+ "in cache", pipeline.getId(), clientMachine);
}
- pipeline.setNodesInOrder(sortedNodes);
+ if (!Objects.equals(pipeline.getNodesInOrder(), sortedNodes)) {
+ k.setPipeline(pipeline.copyWithNodesInOrder(sortedNodes));
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]