This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f2f9156f HDDS-9651. Randomize read pipeline order when datanodes 
cannot be sorted by distance (#5574)
97f2f9156f is described below

commit 97f2f9156f5d5f44ed55de6dc87118fc7b10f3c5
Author: Ivan Brusentsev <[email protected]>
AuthorDate: Wed Nov 15 22:01:26 2023 +0300

    HDDS-9651. Randomize read pipeline order when datanodes cannot be sorted by 
distance (#5574)
---
 .../hadoop/hdds/scm/net/NetworkTopology.java       |  4 ++
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   | 27 +++++++--
 .../hdds/scm/net/TestNetworkTopologyImpl.java      | 65 ++++++++++++++++++++--
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  8 +--
 .../scm/server/TestSCMBlockProtocolServer.java     |  3 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 18 +++---
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  2 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  8 ++-
 8 files changed, 106 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
index c863dc3da5..d0bbd17b51 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
@@ -239,6 +239,10 @@ public interface NetworkTopology {
    * @param reader    Node where need the data
    * @param nodes     Available replicas with the requested data
    * @param activeLen Number of active nodes at the front of the array
+   *
+   * @return list of sorted nodes if reader is not null,
+   * or shuffled input nodes otherwise. The size of returned list is limited
+   * by activeLen parameter.
    */
   List<? extends Node> sortByDistanceCost(Node reader,
       List<? extends Node> nodes, int activeLen);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 1cc10bb87c..c05b30c6c7 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
@@ -57,12 +58,15 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
   private final int maxLevel;
   /** Schema manager. */
   private final NodeSchemaManager schemaManager;
+  /** The algorithm to randomize nodes with equal distances. */
+  private final Consumer<List<? extends Node>> shuffleOperation;
   /** Lock to coordinate cluster tree access. */
   private ReadWriteLock netlock = new ReentrantReadWriteLock(true);
 
   public NetworkTopologyImpl(ConfigurationSource conf) {
     schemaManager = NodeSchemaManager.getInstance();
     schemaManager.init(conf);
+    shuffleOperation = Collections::shuffle;
     maxLevel = schemaManager.getMaxLevel();
     factory = InnerNodeImpl.FACTORY;
     clusterTree = factory.newInnerNode(ROOT, null, null,
@@ -71,8 +75,10 @@ public class NetworkTopologyImpl implements NetworkTopology {
   }
 
   @VisibleForTesting
-  public NetworkTopologyImpl(NodeSchemaManager manager) {
+  public NetworkTopologyImpl(NodeSchemaManager manager,
+                             Consumer<List<? extends Node>> shuffleOperation) {
     schemaManager = manager;
+    this.shuffleOperation = shuffleOperation;
     maxLevel = schemaManager.getMaxLevel();
     factory = InnerNodeImpl.FACTORY;
     clusterTree = factory.newInnerNode(ROOT, null, null,
@@ -80,6 +86,11 @@ public class NetworkTopologyImpl implements NetworkTopology {
         schemaManager.getCost(NetConstants.ROOT_LEVEL));
   }
 
+  @VisibleForTesting
+  public NetworkTopologyImpl(NodeSchemaManager manager) {
+    this(manager, Collections::shuffle);
+  }
+
   /**
    * Add a leaf node. This will be called when a new datanode is added.
    * @param node node to be added; can be null
@@ -754,14 +765,22 @@ public class NetworkTopologyImpl implements 
NetworkTopology {
    * @param reader    Node where need the data
    * @param nodes     Available replicas with the requested data
    * @param activeLen Number of active nodes at the front of the array
+   *
+   * @return list of sorted nodes if reader is not null,
+   * or shuffled input nodes otherwise. The size of returned list is limited
+   * by activeLen parameter.
    */
   @Override
   public List<? extends Node> sortByDistanceCost(Node reader,
       List<? extends Node> nodes, int activeLen) {
-    /** Sort weights for the nodes array */
+    // shuffle input list of nodes if reader is not defined
     if (reader == null) {
-      return nodes;
+      List<? extends Node> shuffledNodes =
+          new ArrayList<>(nodes.subList(0, activeLen));
+      shuffleOperation.accept(shuffledNodes);
+      return shuffledNodes;
     }
+    // Sort weights for the nodes array
     int[] costs = new int[activeLen];
     for (int i = 0; i < activeLen; i++) {
       costs[i] = getDistanceCost(reader, nodes.get(i));
@@ -782,7 +801,7 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
     List<Node> ret = new ArrayList<>();
     for (List<Node> list: tree.values()) {
       if (list != null) {
-        Collections.shuffle(list);
+        shuffleOperation.accept(list);
         for (Node n: list) {
           ret.add(n);
         }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
index 12b75d2b79..c7d86ea4fa 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hdds.scm.net;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -47,12 +49,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,10 +74,24 @@ public class TestNetworkTopologyImpl {
   private NetworkTopology cluster;
   private Node[] dataNodes;
   private Random random = new Random();
+  private Consumer<List<? extends Node>> mockedShuffleOperation;
+
+  @BeforeEach
+  void beforeAll() {
+    mockedShuffleOperation =
+        Mockito.mock(Consumer.class);
+    doAnswer(args -> {
+          List<? extends Node> collection = args.getArgument(0);
+          Collections.shuffle(collection);
+          return null;
+        }
+    ).when(mockedShuffleOperation).accept(any());
+  }
 
   public void initNetworkTopology(NodeSchema[] schemas, Node[] nodeArray) {
     NodeSchemaManager.getInstance().init(schemas, true);
-    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance(),
+        mockedShuffleOperation);
     dataNodes = nodeArray.clone();
     for (int i = 0; i < dataNodes.length; i++) {
       cluster.add(dataNodes[i]);
@@ -208,7 +232,7 @@ public class TestNetworkTopologyImpl {
         new NodeSchema[]{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
     NetworkTopology newCluster = new NetworkTopologyImpl(
-        NodeSchemaManager.getInstance());
+        NodeSchemaManager.getInstance(), mockedShuffleOperation);
     Node[] invalidDataNodes = new Node[] {
         createDatanode("1.1.1.1", "/r1"),
         createDatanode("2.2.2.2", "/r2"),
@@ -765,7 +789,7 @@ public class TestNetworkTopologyImpl {
     NodeSchemaManager manager = NodeSchemaManager.getInstance();
     manager.init(schemas.toArray(new NodeSchema[0]), true);
     NetworkTopology newCluster =
-        new NetworkTopologyImpl(manager);
+        new NetworkTopologyImpl(manager, mockedShuffleOperation);
     Node[] nodeList = new Node[] {
         createDatanode("1.1.1.1", "/r1/ng1"),
         createDatanode("2.2.2.2", "/r1/ng1"),
@@ -806,7 +830,7 @@ public class TestNetworkTopologyImpl {
         .setType(NodeSchema.LayerType.LEAF_NODE).build());
     manager = NodeSchemaManager.getInstance();
     manager.init(schemas.toArray(new NodeSchema[0]), true);
-    newCluster = new NetworkTopologyImpl(manager);
+    newCluster = new NetworkTopologyImpl(manager, mockedShuffleOperation);
     for (Node node: nodeList) {
       newCluster.add(node);
     }
@@ -866,6 +890,7 @@ public class TestNetworkTopologyImpl {
         while (length > 0) {
           List<? extends Node> ret = cluster.sortByDistanceCost(reader,
               Arrays.asList(nodeList), length);
+          assertEquals(length, ret.size());
           for (int i = 0; i < ret.size(); i++) {
             if ((i + 1) < ret.size()) {
               int cost1 = cluster.getDistanceCost(reader, ret.get(i));
@@ -890,6 +915,7 @@ public class TestNetworkTopologyImpl {
       while (length >= 0) {
         List<? extends Node> sortedNodeList =
             cluster.sortByDistanceCost(reader, nodeList, length);
+        assertEquals(length, sortedNodeList.size());
         for (int i = 0; i < sortedNodeList.size(); i++) {
           if ((i + 1) < sortedNodeList.size()) {
             int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i));
@@ -909,6 +935,33 @@ public class TestNetworkTopologyImpl {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("topologies")
+  public void testSortByDistanceCostNullReader(NodeSchema[] schemas,
+                                               Node[] nodeArray) {
+    // GIVEN
+    // various cluster topologies with null reader
+    initNetworkTopology(schemas, nodeArray);
+    List<Node> nodeList = Arrays.asList(dataNodes.clone());
+    final Node reader = null;
+    NetworkTopology spyCluster = spy(cluster);
+    int length = nodeList.size();
+    while (length > 0) {
+      // WHEN
+      List<? extends Node> ret = spyCluster.sortByDistanceCost(reader,
+          nodeList, length);
+      // THEN
+      // no actual distance cost calculated
+      // only shuffle input node list with given length limit
+      verify(mockedShuffleOperation).accept(any());
+      verify(spyCluster, never()).getDistanceCost(any(), any());
+      assertEquals(length, ret.size());
+      assertTrue(nodeList.containsAll(ret));
+      reset(mockedShuffleOperation);
+      length--;
+    }
+  }
+
   @Test
   public void testSingleNodeRackWithAffinityNode() {
     // network topology with default cost
@@ -920,7 +973,7 @@ public class TestNetworkTopologyImpl {
     NodeSchemaManager manager = NodeSchemaManager.getInstance();
     manager.init(schemas.toArray(new NodeSchema[0]), true);
     NetworkTopology newCluster =
-        new NetworkTopologyImpl(manager);
+        new NetworkTopologyImpl(manager, mockedShuffleOperation);
     Node node = createDatanode("1.1.1.1", "/r1");
     newCluster.add(node);
     Node chosenNode =
@@ -945,7 +998,7 @@ public class TestNetworkTopologyImpl {
     NodeSchemaManager manager = NodeSchemaManager.getInstance();
     manager.init(schemas.toArray(new NodeSchema[0]), true);
     NetworkTopology newCluster =
-            new NetworkTopologyImpl(manager);
+            new NetworkTopologyImpl(manager, mockedShuffleOperation);
     Node node = createDatanode("1.1.1.1", "/d1/r1");
     newCluster.add(node);
     assertTrue(newCluster.contains(node));
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 61512663a0..8d29d5eeb8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -358,10 +359,9 @@ public class SCMBlockProtocolServer implements
         }
       });
       List<? extends Node> sortedNodeList = scm.getClusterMap()
-          .sortByDistanceCost(client, nodeList, nodes.size());
-      List<DatanodeDetails> ret = new ArrayList<>();
-      sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node));
-      return ret;
+          .sortByDistanceCost(client, nodeList, nodeList.size());
+      return sortedNodeList.stream().map(r -> (DatanodeDetails) r).collect(
+          Collectors.toList());
     } catch (Exception ex) {
       auditSuccess = false;
       AUDIT.logReadFailure(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index b25a517de7..4ee186323e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -134,7 +134,7 @@ public class TestSCMBlockProtocolServer {
 
     // sort normal datanodes
     String client;
-    client = nodes.get(0);
+    client = nodeManager.getAllNodes().get(0).getIpAddress();
     List<DatanodeDetails> datanodeDetails =
         server.sortDatanodes(nodes, client);
     System.out.println("client = " + client);
@@ -159,6 +159,7 @@ public class TestSCMBlockProtocolServer {
 
     // unknown node to sort
     nodes.add(UUID.randomUUID().toString());
+    client = nodeManager.getAllNodes().get(0).getIpAddress();
     ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request =
         ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
             .newBuilder()
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 576bf594a3..a851bc9fcb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -817,14 +817,11 @@ public class TestKeyManagerImpl {
 
     OmKeyInfo key = keyManager.lookupKey(keyArgs, resolvedBucket(), null);
     assertEquals(key.getKeyName(), keyName);
-    List<OmKeyLocationInfo> keyLocations =
-        key.getLatestVersionLocations().getLocationList();
-    DatanodeDetails leader =
-        keyLocations.get(0).getPipeline().getFirstNode();
-    DatanodeDetails follower1 =
-        keyLocations.get(0).getPipeline().getNodes().get(1);
-    DatanodeDetails follower2 =
-        keyLocations.get(0).getPipeline().getNodes().get(2);
+    Pipeline keyPipeline =
+        key.getLatestVersionLocations().getLocationList().get(0).getPipeline();
+    DatanodeDetails leader = keyPipeline.getFirstNode();
+    DatanodeDetails follower1 = keyPipeline.getNodes().get(1);
+    DatanodeDetails follower2 = keyPipeline.getNodes().get(2);
     assertNotEquals(leader, follower1);
     assertNotEquals(follower1, follower2);
 
@@ -849,8 +846,9 @@ public class TestKeyManagerImpl {
     // lookup key, random node as client
     OmKeyInfo key4 = keyManager.lookupKey(keyArgs, resolvedBucket(),
         "/d=default-drack/127.0.0.1");
-    assertEquals(leader, key4.getLatestVersionLocations()
-        .getLocationList().get(0).getPipeline().getClosestNode());
+    assertTrue(
+        keyPipeline.getNodes().containsAll(key4.getLatestVersionLocations()
+            .getLocationList().get(0).getPipeline().getNodesInOrder()));
   }
 
   @NotNull
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index cb92928b67..63602aa287 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1813,7 +1813,7 @@ public class KeyManagerImpl implements KeyManager {
 
   @VisibleForTesting
   void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
-    if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty()) 
{
+    if (keyInfos != null && clientMachine != null) {
       Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new 
HashMap<>();
       for (OmKeyInfo keyInfo : keyInfos) {
         OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 5bbac41f43..795f933b97 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -77,6 +77,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 
 import static com.google.common.collect.Sets.newHashSet;
@@ -636,10 +638,10 @@ public class TestKeyManagerUnit {
     verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
   }
 
-  @Test
-  public void sortDatanodes() throws Exception {
+  @ParameterizedTest
+  @ValueSource(strings = {"anyhost", ""})
+  public void sortDatanodes(String client) throws Exception {
     // GIVEN
-    String client = "anyhost";
     int pipelineCount = 3;
     int keysPerPipeline = 5;
     OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to