This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 97f2f9156f HDDS-9651. Randomize read pipeline order when datanodes
cannot be sorted by distance (#5574)
97f2f9156f is described below
commit 97f2f9156f5d5f44ed55de6dc87118fc7b10f3c5
Author: Ivan Brusentsev <[email protected]>
AuthorDate: Wed Nov 15 22:01:26 2023 +0300
HDDS-9651. Randomize read pipeline order when datanodes cannot be sorted by
distance (#5574)
---
.../hadoop/hdds/scm/net/NetworkTopology.java | 4 ++
.../hadoop/hdds/scm/net/NetworkTopologyImpl.java | 27 +++++++--
.../hdds/scm/net/TestNetworkTopologyImpl.java | 65 ++++++++++++++++++++--
.../hdds/scm/server/SCMBlockProtocolServer.java | 8 +--
.../scm/server/TestSCMBlockProtocolServer.java | 3 +-
.../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 18 +++---
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 2 +-
.../apache/hadoop/ozone/om/TestKeyManagerUnit.java | 8 ++-
8 files changed, 106 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
index c863dc3da5..d0bbd17b51 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
@@ -239,6 +239,10 @@ public interface NetworkTopology {
* @param reader Node where need the data
* @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
+ *
+ * @return list of sorted nodes if reader is not null,
+ * or shuffled input nodes otherwise. The size of returned list is limited
+ * by activeLen parameter.
*/
List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 1cc10bb87c..c05b30c6c7 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
@@ -57,12 +58,15 @@ public class NetworkTopologyImpl implements NetworkTopology
{
private final int maxLevel;
/** Schema manager. */
private final NodeSchemaManager schemaManager;
+ /** The algorithm to randomize nodes with equal distances. */
+ private final Consumer<List<? extends Node>> shuffleOperation;
/** Lock to coordinate cluster tree access. */
private ReadWriteLock netlock = new ReentrantReadWriteLock(true);
public NetworkTopologyImpl(ConfigurationSource conf) {
schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(conf);
+ shuffleOperation = Collections::shuffle;
maxLevel = schemaManager.getMaxLevel();
factory = InnerNodeImpl.FACTORY;
clusterTree = factory.newInnerNode(ROOT, null, null,
@@ -71,8 +75,10 @@ public class NetworkTopologyImpl implements NetworkTopology {
}
@VisibleForTesting
- public NetworkTopologyImpl(NodeSchemaManager manager) {
+ public NetworkTopologyImpl(NodeSchemaManager manager,
+ Consumer<List<? extends Node>> shuffleOperation) {
schemaManager = manager;
+ this.shuffleOperation = shuffleOperation;
maxLevel = schemaManager.getMaxLevel();
factory = InnerNodeImpl.FACTORY;
clusterTree = factory.newInnerNode(ROOT, null, null,
@@ -80,6 +86,11 @@ public class NetworkTopologyImpl implements NetworkTopology {
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}
+ @VisibleForTesting
+ public NetworkTopologyImpl(NodeSchemaManager manager) {
+ this(manager, Collections::shuffle);
+ }
+
/**
* Add a leaf node. This will be called when a new datanode is added.
* @param node node to be added; can be null
@@ -754,14 +765,22 @@ public class NetworkTopologyImpl implements
NetworkTopology {
* @param reader Node where need the data
* @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
+ *
+ * @return list of sorted nodes if reader is not null,
+ * or shuffled input nodes otherwise. The size of returned list is limited
+ * by activeLen parameter.
*/
@Override
public List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen) {
- /** Sort weights for the nodes array */
+ // shuffle input list of nodes if reader is not defined
if (reader == null) {
- return nodes;
+ List<? extends Node> shuffledNodes =
+ new ArrayList<>(nodes.subList(0, activeLen));
+ shuffleOperation.accept(shuffledNodes);
+ return shuffledNodes;
}
+ // Sort weights for the nodes array
int[] costs = new int[activeLen];
for (int i = 0; i < activeLen; i++) {
costs[i] = getDistanceCost(reader, nodes.get(i));
@@ -782,7 +801,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
List<Node> ret = new ArrayList<>();
for (List<Node> list: tree.values()) {
if (list != null) {
- Collections.shuffle(list);
+ shuffleOperation.accept(list);
for (Node n: list) {
ret.add(n);
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
index 12b75d2b79..c7d86ea4fa 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hdds.scm.net;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -47,12 +49,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,10 +74,24 @@ public class TestNetworkTopologyImpl {
private NetworkTopology cluster;
private Node[] dataNodes;
private Random random = new Random();
+ private Consumer<List<? extends Node>> mockedShuffleOperation;
+
+ @BeforeEach
+ void beforeAll() {
+ mockedShuffleOperation =
+ Mockito.mock(Consumer.class);
+ doAnswer(args -> {
+ List<? extends Node> collection = args.getArgument(0);
+ Collections.shuffle(collection);
+ return null;
+ }
+ ).when(mockedShuffleOperation).accept(any());
+ }
public void initNetworkTopology(NodeSchema[] schemas, Node[] nodeArray) {
NodeSchemaManager.getInstance().init(schemas, true);
- cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+ cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance(),
+ mockedShuffleOperation);
dataNodes = nodeArray.clone();
for (int i = 0; i < dataNodes.length; i++) {
cluster.add(dataNodes[i]);
@@ -208,7 +232,7 @@ public class TestNetworkTopologyImpl {
new NodeSchema[]{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
NetworkTopology newCluster = new NetworkTopologyImpl(
- NodeSchemaManager.getInstance());
+ NodeSchemaManager.getInstance(), mockedShuffleOperation);
Node[] invalidDataNodes = new Node[] {
createDatanode("1.1.1.1", "/r1"),
createDatanode("2.2.2.2", "/r2"),
@@ -765,7 +789,7 @@ public class TestNetworkTopologyImpl {
NodeSchemaManager manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
NetworkTopology newCluster =
- new NetworkTopologyImpl(manager);
+ new NetworkTopologyImpl(manager, mockedShuffleOperation);
Node[] nodeList = new Node[] {
createDatanode("1.1.1.1", "/r1/ng1"),
createDatanode("2.2.2.2", "/r1/ng1"),
@@ -806,7 +830,7 @@ public class TestNetworkTopologyImpl {
.setType(NodeSchema.LayerType.LEAF_NODE).build());
manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
- newCluster = new NetworkTopologyImpl(manager);
+ newCluster = new NetworkTopologyImpl(manager, mockedShuffleOperation);
for (Node node: nodeList) {
newCluster.add(node);
}
@@ -866,6 +890,7 @@ public class TestNetworkTopologyImpl {
while (length > 0) {
List<? extends Node> ret = cluster.sortByDistanceCost(reader,
Arrays.asList(nodeList), length);
+ assertEquals(length, ret.size());
for (int i = 0; i < ret.size(); i++) {
if ((i + 1) < ret.size()) {
int cost1 = cluster.getDistanceCost(reader, ret.get(i));
@@ -890,6 +915,7 @@ public class TestNetworkTopologyImpl {
while (length >= 0) {
List<? extends Node> sortedNodeList =
cluster.sortByDistanceCost(reader, nodeList, length);
+ assertEquals(length, sortedNodeList.size());
for (int i = 0; i < sortedNodeList.size(); i++) {
if ((i + 1) < sortedNodeList.size()) {
int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i));
@@ -909,6 +935,33 @@ public class TestNetworkTopologyImpl {
}
}
+ @ParameterizedTest
+ @MethodSource("topologies")
+ public void testSortByDistanceCostNullReader(NodeSchema[] schemas,
+ Node[] nodeArray) {
+ // GIVEN
+ // various cluster topologies with null reader
+ initNetworkTopology(schemas, nodeArray);
+ List<Node> nodeList = Arrays.asList(dataNodes.clone());
+ final Node reader = null;
+ NetworkTopology spyCluster = spy(cluster);
+ int length = nodeList.size();
+ while (length > 0) {
+ // WHEN
+ List<? extends Node> ret = spyCluster.sortByDistanceCost(reader,
+ nodeList, length);
+ // THEN
+ // no actual distance cost calculated
+ // only shuffle input node list with given length limit
+ verify(mockedShuffleOperation).accept(any());
+ verify(spyCluster, never()).getDistanceCost(any(), any());
+ assertEquals(length, ret.size());
+ assertTrue(nodeList.containsAll(ret));
+ reset(mockedShuffleOperation);
+ length--;
+ }
+ }
+
@Test
public void testSingleNodeRackWithAffinityNode() {
// network topology with default cost
@@ -920,7 +973,7 @@ public class TestNetworkTopologyImpl {
NodeSchemaManager manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
NetworkTopology newCluster =
- new NetworkTopologyImpl(manager);
+ new NetworkTopologyImpl(manager, mockedShuffleOperation);
Node node = createDatanode("1.1.1.1", "/r1");
newCluster.add(node);
Node chosenNode =
@@ -945,7 +998,7 @@ public class TestNetworkTopologyImpl {
NodeSchemaManager manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
NetworkTopology newCluster =
- new NetworkTopologyImpl(manager);
+ new NetworkTopologyImpl(manager, mockedShuffleOperation);
Node node = createDatanode("1.1.1.1", "/d1/r1");
newCluster.add(node);
assertTrue(newCluster.contains(node));
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 61512663a0..8d29d5eeb8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.client.BlockID;
@@ -358,10 +359,9 @@ public class SCMBlockProtocolServer implements
}
});
List<? extends Node> sortedNodeList = scm.getClusterMap()
- .sortByDistanceCost(client, nodeList, nodes.size());
- List<DatanodeDetails> ret = new ArrayList<>();
- sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node));
- return ret;
+ .sortByDistanceCost(client, nodeList, nodeList.size());
+ return sortedNodeList.stream().map(r -> (DatanodeDetails) r).collect(
+ Collectors.toList());
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index b25a517de7..4ee186323e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -134,7 +134,7 @@ public class TestSCMBlockProtocolServer {
// sort normal datanodes
String client;
- client = nodes.get(0);
+ client = nodeManager.getAllNodes().get(0).getIpAddress();
List<DatanodeDetails> datanodeDetails =
server.sortDatanodes(nodes, client);
System.out.println("client = " + client);
@@ -159,6 +159,7 @@ public class TestSCMBlockProtocolServer {
// unknown node to sort
nodes.add(UUID.randomUUID().toString());
+ client = nodeManager.getAllNodes().get(0).getIpAddress();
ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request =
ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
.newBuilder()
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 576bf594a3..a851bc9fcb 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -817,14 +817,11 @@ public class TestKeyManagerImpl {
OmKeyInfo key = keyManager.lookupKey(keyArgs, resolvedBucket(), null);
assertEquals(key.getKeyName(), keyName);
- List<OmKeyLocationInfo> keyLocations =
- key.getLatestVersionLocations().getLocationList();
- DatanodeDetails leader =
- keyLocations.get(0).getPipeline().getFirstNode();
- DatanodeDetails follower1 =
- keyLocations.get(0).getPipeline().getNodes().get(1);
- DatanodeDetails follower2 =
- keyLocations.get(0).getPipeline().getNodes().get(2);
+ Pipeline keyPipeline =
+ key.getLatestVersionLocations().getLocationList().get(0).getPipeline();
+ DatanodeDetails leader = keyPipeline.getFirstNode();
+ DatanodeDetails follower1 = keyPipeline.getNodes().get(1);
+ DatanodeDetails follower2 = keyPipeline.getNodes().get(2);
assertNotEquals(leader, follower1);
assertNotEquals(follower1, follower2);
@@ -849,8 +846,9 @@ public class TestKeyManagerImpl {
// lookup key, random node as client
OmKeyInfo key4 = keyManager.lookupKey(keyArgs, resolvedBucket(),
"/d=default-drack/127.0.0.1");
- assertEquals(leader, key4.getLatestVersionLocations()
- .getLocationList().get(0).getPipeline().getClosestNode());
+ assertTrue(
+ keyPipeline.getNodes().containsAll(key4.getLatestVersionLocations()
+ .getLocationList().get(0).getPipeline().getNodesInOrder()));
}
@NotNull
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index cb92928b67..63602aa287 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1813,7 +1813,7 @@ public class KeyManagerImpl implements KeyManager {
@VisibleForTesting
void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
- if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty())
{
+ if (keyInfos != null && clientMachine != null) {
Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new
HashMap<>();
for (OmKeyInfo keyInfo : keyInfos) {
OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 5bbac41f43..795f933b97 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -77,6 +77,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import static com.google.common.collect.Sets.newHashSet;
@@ -636,10 +638,10 @@ public class TestKeyManagerUnit {
verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
}
- @Test
- public void sortDatanodes() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"anyhost", ""})
+ public void sortDatanodes(String client) throws Exception {
// GIVEN
- String client = "anyhost";
int pipelineCount = 3;
int keysPerPipeline = 5;
OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]