This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1e4143ad HDDS-10430. Race condition around Pipeline#nodesInOrder 
(#6316)
ff1e4143ad is described below

commit ff1e4143ad22ae56a9476c180bb56cdb3556c487
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Mar 5 09:50:50 2024 +0100

    HDDS-10430. Race condition around Pipeline#nodesInOrder (#6316)
---
 .../container/common/helpers/AllocatedBlock.java   |  14 ++-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  | 120 ++++++++++-----------
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   9 +-
 .../hadoop/hdds/scm/TestXceiverClientGrpc.java     |   2 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   4 +-
 5 files changed, 79 insertions(+), 70 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
index 7ac0401af1..5a1d8f90ea 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  * contains a Pipeline and the key.
  */
 public final class AllocatedBlock {
-  private Pipeline pipeline;
-  private ContainerBlockID containerBlockID;
+  private final Pipeline pipeline;
+  private final ContainerBlockID containerBlockID;
 
   /**
    * Builder for AllocatedBlock.
@@ -63,4 +63,14 @@ public final class AllocatedBlock {
   public ContainerBlockID getBlockID() {
     return containerBlockID;
   }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public Builder toBuilder() {
+    return new Builder()
+        .setContainerBlockID(containerBlockID)
+        .setPipeline(pipeline);
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 9d95cee483..05d83a8b8b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -34,6 +34,8 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -76,10 +78,10 @@ public final class Pipeline {
   private final ReplicationConfig replicationConfig;
 
   private final PipelineState state;
-  private Map<DatanodeDetails, Long> nodeStatus;
-  private Map<DatanodeDetails, Integer> replicaIndexes;
+  private final Map<DatanodeDetails, Long> nodeStatus;
+  private final Map<DatanodeDetails, Integer> replicaIndexes;
   // nodes with ordered distance to client
-  private List<DatanodeDetails> nodesInOrder = new ArrayList<>();
+  private final ImmutableList<DatanodeDetails> nodesInOrder;
   // Current reported Leader for the pipeline
   private UUID leaderId;
   // Timestamp for pipeline upon creation
@@ -103,17 +105,17 @@ public final class Pipeline {
    * set to <i>Instant.now</i> when you crate the Pipeline object as part of
    * state change.
    */
-  private Pipeline(PipelineID id,
-      ReplicationConfig replicationConfig, PipelineState state,
-      Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) {
-    this.id = id;
-    this.replicationConfig = replicationConfig;
-    this.state = state;
-    this.nodeStatus = nodeStatus;
-    this.creationTimestamp = Instant.now();
-    this.suggestedLeaderId = suggestedLeaderId;
-    this.replicaIndexes = new HashMap<>();
-    this.stateEnterTime = Instant.now();
+  private Pipeline(Builder b) {
+    id = b.id;
+    replicationConfig = b.replicationConfig;
+    state = b.state;
+    leaderId = b.leaderId;
+    suggestedLeaderId = b.suggestedLeaderId;
+    nodeStatus = b.nodeStatus;
+    nodesInOrder = b.nodesInOrder != null ? 
ImmutableList.copyOf(b.nodesInOrder) : ImmutableList.of();
+    replicaIndexes = b.replicaIndexes != null ? 
ImmutableMap.copyOf(b.replicaIndexes) : ImmutableMap.of();
+    creationTimestamp = b.creationTimestamp != null ? b.creationTimestamp : 
Instant.now();
+    stateEnterTime = Instant.now();
   }
 
   /**
@@ -310,19 +312,6 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
-  public boolean isAllocationTimeout() {
-    //TODO: define a system property to control the timeout value
-    return false;
-  }
-
-  public void setNodesInOrder(List<DatanodeDetails> nodes) {
-    nodesInOrder.clear();
-    if (null == nodes) {
-      return;
-    }
-    nodesInOrder.addAll(nodes);
-  }
-
   public List<DatanodeDetails> getNodesInOrder() {
     if (nodesInOrder.isEmpty()) {
       LOG.debug("Nodes in order is empty, delegate to getNodes");
@@ -406,33 +395,39 @@ public final class Pipeline {
 
     // To save the message size on wire, only transfer the node order based on
     // network topology
-    List<DatanodeDetails> nodes = nodesInOrder;
-    if (!nodes.isEmpty()) {
-      for (int i = 0; i < nodes.size(); i++) {
+    if (!nodesInOrder.isEmpty()) {
+      for (int i = 0; i < nodesInOrder.size(); i++) {
         Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
         for (int j = 0; j < nodeStatus.keySet().size(); j++) {
-          if (it.next().equals(nodes.get(i))) {
+          if (it.next().equals(nodesInOrder.get(i))) {
             builder.addMemberOrders(j);
             break;
           }
         }
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Serialize pipeline {} with nodesInOrder {}", id, nodes);
+        LOG.debug("Serialize pipeline {} with nodesInOrder {}", id, 
nodesInOrder);
       }
     }
     return builder.build();
   }
 
-  static Pipeline getFromProtobufSetCreationTimestamp(
+  private static Pipeline getFromProtobufSetCreationTimestamp(
       HddsProtos.Pipeline proto) throws UnknownPipelineStateException {
-    final Pipeline pipeline = getFromProtobuf(proto);
-    // When SCM is restarted, set Creation time with current time.
-    pipeline.setCreationTimestamp(Instant.now());
-    return pipeline;
+    return toBuilder(proto)
+        .setCreateTimestamp(Instant.now())
+        .build();
   }
 
-  public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
+  public Pipeline copyWithNodesInOrder(List<DatanodeDetails> nodes) {
+    return toBuilder().setNodesInOrder(nodes).build();
+  }
+
+  public Builder toBuilder() {
+    return newBuilder(this);
+  }
+
+  public static Builder toBuilder(HddsProtos.Pipeline pipeline)
       throws UnknownPipelineStateException {
     Preconditions.checkNotNull(pipeline, "Pipeline is null");
 
@@ -473,9 +468,13 @@ public final class Pipeline {
         .setReplicaIndexes(nodes)
         .setLeaderId(leaderId)
         .setSuggestedLeaderId(suggestedLeaderId)
-        .setNodesInOrder(pipeline.getMemberOrdersList())
-        .setCreateTimestamp(pipeline.getCreationTimeStamp())
-        .build();
+        .setNodeOrder(pipeline.getMemberOrdersList())
+        .setCreateTimestamp(pipeline.getCreationTimeStamp());
+  }
+
+  public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
+      throws UnknownPipelineStateException {
+    return toBuilder(pipeline).build();
   }
 
   @Override
@@ -529,10 +528,6 @@ public final class Pipeline {
     return new Builder(pipeline);
   }
 
-  private void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes) 
{
-    this.replicaIndexes = replicaIndexes;
-  }
-
   /**
    * Builder class for Pipeline.
    */
@@ -546,7 +541,7 @@ public final class Pipeline {
     private UUID leaderId = null;
     private Instant creationTimestamp = null;
     private UUID suggestedLeaderId = null;
-    private Map<DatanodeDetails, Integer> replicaIndexes = new HashMap<>();
+    private Map<DatanodeDetails, Integer> replicaIndexes;
 
     public Builder() { }
 
@@ -559,8 +554,8 @@ public final class Pipeline {
       this.leaderId = pipeline.getLeaderId();
       this.creationTimestamp = pipeline.getCreationTimestamp();
       this.suggestedLeaderId = pipeline.getSuggestedLeaderId();
-      this.replicaIndexes = new HashMap<>();
       if (nodeStatus != null) {
+        replicaIndexes = new HashMap<>();
         for (DatanodeDetails dn : nodeStatus.keySet()) {
           int index = pipeline.getReplicaIndex(dn);
           if (index > 0) {
@@ -601,11 +596,22 @@ public final class Pipeline {
       return this;
     }
 
-    public Builder setNodesInOrder(List<Integer> orders) {
+    public Builder setNodeOrder(List<Integer> orders) {
+      // for build from ProtoBuf
       this.nodeOrder = orders;
       return this;
     }
 
+    public Builder setNodesInOrder(List<DatanodeDetails> nodes) {
+      this.nodesInOrder = new LinkedList<>(nodes);
+      return this;
+    }
+
+    public Builder setCreateTimestamp(Instant instant) {
+      this.creationTimestamp = instant;
+      return this;
+    }
+
     public Builder setCreateTimestamp(long createTimestamp) {
       this.creationTimestamp = Instant.ofEpochMilli(createTimestamp);
       return this;
@@ -627,19 +633,8 @@ public final class Pipeline {
       Preconditions.checkNotNull(replicationConfig);
       Preconditions.checkNotNull(state);
       Preconditions.checkNotNull(nodeStatus);
-      Pipeline pipeline =
-          new Pipeline(id, replicationConfig, state, nodeStatus,
-              suggestedLeaderId);
-      pipeline.setLeaderId(leaderId);
-      // overwrite with original creationTimestamp
-      if (creationTimestamp != null) {
-        pipeline.setCreationTimestamp(creationTimestamp);
-      }
-
-      pipeline.setReplicaIndexes(replicaIndexes);
 
       if (nodeOrder != null && !nodeOrder.isEmpty()) {
-        // This branch is for build from ProtoBuf
         List<DatanodeDetails> nodesWithOrder = new ArrayList<>();
         for (int i = 0; i < nodeOrder.size(); i++) {
           int nodeIndex = nodeOrder.get(i);
@@ -657,13 +652,10 @@ public final class Pipeline {
           LOG.debug("Deserialize nodesInOrder {} in pipeline {}",
               nodesWithOrder, id);
         }
-        pipeline.setNodesInOrder(nodesWithOrder);
-      } else if (nodesInOrder != null) {
-        // This branch is for pipeline clone
-        pipeline.setNodesInOrder(nodesInOrder);
+        nodesInOrder = nodesWithOrder;
       }
 
-      return pipeline;
+      return new Pipeline(this);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 69f190c7fb..0747f04584 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -203,15 +204,19 @@ public class SCMBlockProtocolServer implements
         AllocatedBlock block = scm.getScmBlockManager()
             .allocateBlock(size, replicationConfig, owner, excludeList);
         if (block != null) {
-          blocks.add(block);
           // Sort the datanodes if client machine is specified
           final Node client = getClientNode(clientMachine);
           if (client != null) {
             final List<DatanodeDetails> nodes = block.getPipeline().getNodes();
             final List<DatanodeDetails> sorted = scm.getClusterMap()
                 .sortByDistanceCost(client, nodes, nodes.size());
-            block.getPipeline().setNodesInOrder(sorted);
+            if (!Objects.equals(sorted, 
block.getPipeline().getNodesInOrder())) {
+              block = block.toBuilder()
+                  
.setPipeline(block.getPipeline().copyWithNodesInOrder(sorted))
+                  .build();
+            }
           }
+          blocks.add(block);
         }
       }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
index fb312dfb50..79c937ceb5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
@@ -71,8 +71,8 @@ public class TestXceiverClientGrpc {
             RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
         .setState(Pipeline.PipelineState.CLOSED)
         .setNodes(dns)
+        .setNodesInOrder(dnsInOrder)
         .build();
-    pipeline.setNodesInOrder(dnsInOrder);
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 3786601dd6..af6b41b610 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1880,7 +1880,9 @@ public class KeyManagerImpl implements KeyManager {
             LOG.debug("Found sorted datanodes for pipeline {} and client {} "
                 + "in cache", pipeline.getId(), clientMachine);
           }
-          pipeline.setNodesInOrder(sortedNodes);
+          if (!Objects.equals(pipeline.getNodesInOrder(), sortedNodes)) {
+            k.setPipeline(pipeline.copyWithNodesInOrder(sortedNodes));
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to