This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 140c5deffb HDDS-9343. Shift sortDatanodes logic to OM (#5391)
140c5deffb is described below
commit 140c5deffb848efc762aacf0cad9aa617c9c5374
Author: tanvipenumudy <[email protected]>
AuthorDate: Thu Mar 7 04:15:08 2024 +0530
HDDS-9343. Shift sortDatanodes logic to OM (#5391)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 25 +++
.../org/apache/hadoop/hdds/scm/net/InnerNode.java | 14 ++
.../apache/hadoop/hdds/scm/net/InnerNodeImpl.java | 159 +++++++++++++++++-
.../hadoop/hdds/scm/net/NetworkTopologyImpl.java | 20 ++-
.../java/org/apache/hadoop/hdds/scm/net/Node.java | 20 +++
.../org/apache/hadoop/hdds/scm/net/NodeImpl.java | 15 ++
.../hadoop/hdds/scm/net/NodeSchemaManager.java | 8 +
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 5 +
.../common/src/main/resources/ozone-default.xml | 8 +
.../hadoop/hdds/scm/client/ScmTopologyClient.java | 127 ++++++++++++++
.../hadoop/hdds/scm/client/package-info.java | 24 +++
.../scm/protocol/ScmBlockLocationProtocol.java | 8 +
...lockLocationProtocolClientSideTranslatorPB.java | 43 +++++
.../interface-client/src/main/proto/hdds.proto | 24 +++
.../src/main/proto/ScmServerProtocol.proto | 11 +-
.../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 1 -
.../apache/hadoop/hdds/scm/node/NodeStatus.java | 1 -
...lockLocationProtocolServerSideTranslatorPB.java | 15 ++
.../hdds/scm/server/SCMBlockProtocolServer.java | 6 +
.../apache/hadoop/ozone/TestDelegationToken.java | 4 +
.../ozone/TestGetClusterTreeInformation.java | 87 ++++++++++
.../apache/hadoop/ozone/TestOMSortDatanodes.java | 187 +++++++++++++++++++++
.../hadoop/ozone/TestSecureOzoneCluster.java | 11 ++
.../ozone/om/TestOmContainerLocationCache.java | 11 +-
.../om/TestOzoneManagerListVolumesSecure.java | 3 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 91 +++++++---
.../org/apache/hadoop/ozone/om/OzoneManager.java | 42 +++++
.../org/apache/hadoop/ozone/om/OmTestManagers.java | 5 +
.../ozone/om/ScmBlockLocationTestingClient.java | 11 ++
.../apache/hadoop/ozone/om/TestKeyManagerUnit.java | 80 +--------
30 files changed, 959 insertions(+), 107 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 5b6fb6fe9b..a30f8414dc 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -364,6 +364,9 @@ public class DatanodeDetails extends NodeImpl implements
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
+ if (datanodeDetailsProto.hasLevel()) {
+ builder.setLevel(datanodeDetailsProto.getLevel());
+ }
if (datanodeDetailsProto.hasPersistedOpState()) {
builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState());
}
@@ -456,6 +459,9 @@ public class DatanodeDetails extends NodeImpl implements
if (!Strings.isNullOrEmpty(getNetworkLocation())) {
builder.setNetworkLocation(getNetworkLocation());
}
+ if (getLevel() > 0) {
+ builder.setLevel(getLevel());
+ }
if (persistedOpState != null) {
builder.setPersistedOpState(persistedOpState);
}
@@ -585,6 +591,7 @@ public class DatanodeDetails extends NodeImpl implements
private String hostName;
private String networkName;
private String networkLocation;
+ private int level;
private List<Port> ports;
private String certSerialId;
private String version;
@@ -616,6 +623,7 @@ public class DatanodeDetails extends NodeImpl implements
this.hostName = details.getHostName();
this.networkName = details.getNetworkName();
this.networkLocation = details.getNetworkLocation();
+ this.level = details.getLevel();
this.ports = details.getPorts();
this.certSerialId = details.getCertSerialId();
this.version = details.getVersion();
@@ -683,6 +691,11 @@ public class DatanodeDetails extends NodeImpl implements
return this;
}
+ public Builder setLevel(int level) {
+ this.level = level;
+ return this;
+ }
+
/**
* Adds a DataNode Port.
*
@@ -807,6 +820,9 @@ public class DatanodeDetails extends NodeImpl implements
if (networkName != null) {
dn.setNetworkName(networkName);
}
+ if (level > 0) {
+ dn.setLevel(level);
+ }
return dn;
}
}
@@ -1011,4 +1027,13 @@ public class DatanodeDetails extends NodeImpl implements
public void setBuildDate(String date) {
this.buildDate = date;
}
+
+ @Override
+ public HddsProtos.NetworkNode toProtobuf(
+ int clientVersion) {
+ HddsProtos.NetworkNode networkNode =
+ HddsProtos.NetworkNode.newBuilder()
+ .setDatanodeDetails(toProtoBuilder(clientVersion).build()).build();
+ return networkNode;
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
index c87d826d25..6074e7da0a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.net;
import java.util.Collection;
import java.util.List;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
/**
* The interface defines an inner node in a network topology.
* An inner node represents network topology entities, such as data center,
@@ -89,4 +91,16 @@ public interface InnerNode extends Node {
*/
Node getLeaf(int leafIndex, List<String> excludedScopes,
Collection<Node> excludedNodes, int ancestorGen);
+
+ @Override
+ HddsProtos.NetworkNode toProtobuf(int clientVersion);
+
+ boolean equals(Object o);
+
+ int hashCode();
+
+ static InnerNode fromProtobuf(
+ HddsProtos.InnerNode innerNode) {
+ return InnerNodeImpl.fromProtobuf(innerNode);
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
index f2648f3d29..332dddac25 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +48,10 @@ public class InnerNodeImpl extends NodeImpl implements
InnerNode {
}
}
- static final Factory FACTORY = new Factory();
+ public static final Factory FACTORY = new Factory();
// a map of node's network name to Node for quick search and keep
// the insert order
- private final HashMap<String, Node> childrenMap =
+ private HashMap<String, Node> childrenMap =
new LinkedHashMap<String, Node>();
// number of descendant leaves under this node
private int numOfLeaves;
@@ -66,6 +67,76 @@ public class InnerNodeImpl extends NodeImpl implements
InnerNode {
super(name, location, parent, level, cost);
}
+ /**
+ * Construct an InnerNode from its name, network location, level, cost,
+ * childrenMap and number of leaves. This constructor is used as part of
+ * protobuf deserialization.
+ */
+ protected InnerNodeImpl(String name, String location, int level, int cost,
+ HashMap<String, Node> childrenMap, int numOfLeaves) {
+ super(name, location, null, level, cost);
+ this.childrenMap = childrenMap;
+ this.numOfLeaves = numOfLeaves;
+ }
+
+ /**
+ * InnerNodeImpl Builder to help construct an InnerNodeImpl object from
+ * protobuf objects.
+ */
+ public static class Builder {
+ private String name;
+ private String location;
+ private int cost;
+ private int level;
+ private HashMap<String, Node> childrenMap = new LinkedHashMap<>();
+ private int numOfLeaves;
+
+ public Builder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder setLocation(String location) {
+ this.location = location;
+ return this;
+ }
+
+ public Builder setCost(int cost) {
+ this.cost = cost;
+ return this;
+ }
+
+ public Builder setLevel(int level) {
+ this.level = level;
+ return this;
+ }
+
+ public Builder setChildrenMap(
+ List<HddsProtos.ChildrenMap> childrenMapList) {
+ HashMap<String, Node> newChildrenMap = new LinkedHashMap<>();
+ for (HddsProtos.ChildrenMap childrenMapProto :
+ childrenMapList) {
+ String networkName = childrenMapProto.hasNetworkName() ?
+ childrenMapProto.getNetworkName() : null;
+ Node node = childrenMapProto.hasNetworkNode() ?
+ Node.fromProtobuf(childrenMapProto.getNetworkNode()) : null;
+ newChildrenMap.put(networkName, node);
+ }
+ this.childrenMap = newChildrenMap;
+ return this;
+ }
+
+ public Builder setNumOfLeaves(int numOfLeaves) {
+ this.numOfLeaves = numOfLeaves;
+ return this;
+ }
+
+ public InnerNodeImpl build() {
+ return new InnerNodeImpl(name, location, level, cost, childrenMap,
+ numOfLeaves);
+ }
+ }
+
/** @return the number of children this node has */
private int getNumOfChildren() {
return childrenMap.size();
@@ -77,6 +148,11 @@ public class InnerNodeImpl extends NodeImpl implements
InnerNode {
return numOfLeaves;
}
+ /** @return a map of node's network name to Node. */
+ public HashMap<String, Node> getChildrenMap() {
+ return childrenMap;
+ }
+
/**
* @return number of its all nodes at level <i>level</i>. Here level is a
* relative level. If level is 1, means node itself. If level is 2, means its
@@ -390,14 +466,83 @@ public class InnerNodeImpl extends NodeImpl implements
InnerNode {
}
@Override
- public boolean equals(Object to) {
- if (to == null) {
- return false;
+ public HddsProtos.NetworkNode toProtobuf(
+ int clientVersion) {
+
+ HddsProtos.InnerNode.Builder innerNode =
+ HddsProtos.InnerNode.newBuilder()
+ .setNumOfLeaves(numOfLeaves)
+ .setNodeTopology(
+ NodeImpl.toProtobuf(getNetworkName(), getNetworkLocation(),
+ getLevel(), getCost()));
+
+ if (childrenMap != null && !childrenMap.isEmpty()) {
+ for (Map.Entry<String, Node> entry : childrenMap.entrySet()) {
+ if (entry.getValue() != null) {
+ HddsProtos.ChildrenMap childrenMapProto =
+ HddsProtos.ChildrenMap.newBuilder()
+ .setNetworkName(entry.getKey())
+ .setNetworkNode(entry.getValue().toProtobuf(clientVersion))
+ .build();
+ innerNode.addChildrenMap(childrenMapProto);
+ }
+ }
+ }
+ innerNode.build();
+
+ HddsProtos.NetworkNode networkNode =
+ HddsProtos.NetworkNode.newBuilder()
+ .setInnerNode(innerNode).build();
+
+ return networkNode;
+ }
+
+ public static InnerNode fromProtobuf(HddsProtos.InnerNode innerNode) {
+ InnerNodeImpl.Builder builder = new InnerNodeImpl.Builder();
+
+ if (innerNode.hasNodeTopology()) {
+ HddsProtos.NodeTopology nodeTopology = innerNode.getNodeTopology();
+
+ if (nodeTopology.hasName()) {
+ builder.setName(nodeTopology.getName());
+ }
+ if (nodeTopology.hasLocation()) {
+ builder.setLocation(nodeTopology.getLocation());
+ }
+ if (nodeTopology.hasLevel()) {
+ builder.setLevel(nodeTopology.getLevel());
+ }
+ if (nodeTopology.hasCost()) {
+ builder.setCost(nodeTopology.getCost());
+ }
+ }
+
+ if (!innerNode.getChildrenMapList().isEmpty()) {
+ builder.setChildrenMap(innerNode.getChildrenMapList());
+ }
+ if (innerNode.hasNumOfLeaves()) {
+ builder.setNumOfLeaves(innerNode.getNumOfLeaves());
}
- if (this == to) {
+
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- return this.toString().equals(to.toString());
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InnerNodeImpl innerNode = (InnerNodeImpl) o;
+ return this.getNetworkName().equals(innerNode.getNetworkName()) &&
+ this.getNetworkLocation().equals(innerNode.getNetworkLocation()) &&
+ this.getLevel() == innerNode.getLevel() &&
+ this.getCost() == innerNode.getCost() &&
+ this.numOfLeaves == innerNode.numOfLeaves &&
+ this.childrenMap.size() == innerNode.childrenMap.size() &&
+ this.childrenMap.equals(innerNode.childrenMap);
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 2dc86c1b68..f6f013259c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -75,6 +75,15 @@ public class NetworkTopologyImpl implements NetworkTopology {
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}
+ public NetworkTopologyImpl(String schemaFile, InnerNode clusterTree) {
+ schemaManager = NodeSchemaManager.getInstance();
+ schemaManager.init(schemaFile);
+ maxLevel = schemaManager.getMaxLevel();
+ shuffleOperation = Collections::shuffle;
+ factory = InnerNodeImpl.FACTORY;
+ this.clusterTree = clusterTree;
+ }
+
@VisibleForTesting
public NetworkTopologyImpl(NodeSchemaManager manager,
Consumer<List<? extends Node>> shuffleOperation) {
@@ -726,8 +735,13 @@ public class NetworkTopologyImpl implements
NetworkTopology {
int cost = 0;
netlock.readLock().lock();
try {
- if ((node1.getAncestor(level1 - 1) != clusterTree) ||
- (node2.getAncestor(level2 - 1) != clusterTree)) {
+ Node ancestor1 = node1.getAncestor(level1 - 1);
+ boolean node1Topology = (ancestor1 != null && clusterTree != null &&
+ !ancestor1.equals(clusterTree)) || (ancestor1 != clusterTree);
+ Node ancestor2 = node2.getAncestor(level2 - 1);
+ boolean node2Topology = (ancestor2 != null && clusterTree != null &&
+ !ancestor2.equals(clusterTree)) || (ancestor2 != clusterTree);
+ if (node1Topology || node2Topology) {
LOG.debug("One of the nodes is outside of network topology");
return Integer.MAX_VALUE;
}
@@ -741,7 +755,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
level2--;
cost += node2 == null ? 0 : node2.getCost();
}
- while (node1 != null && node2 != null && node1 != node2) {
+ while (node1 != null && node2 != null && !node1.equals(node2)) {
node1 = node1.getParent();
node2 = node2.getParent();
cost += node1 == null ? 0 : node1.getCost();
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
index 9884888a1d..50f702cce0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdds.scm.net;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
/**
* The interface defines a node in a network topology.
* A node may be a leave representing a data node or an inner
@@ -126,4 +129,21 @@ public interface Node {
* @return true if this node is under a specific scope
*/
boolean isDescendant(String nodePath);
+
+ default HddsProtos.NetworkNode toProtobuf(
+ int clientVersion) {
+ return null;
+ }
+
+ static Node fromProtobuf(
+ HddsProtos.NetworkNode networkNode) {
+ if (networkNode.hasDatanodeDetails()) {
+ return DatanodeDetails.getFromProtoBuf(
+ networkNode.getDatanodeDetails());
+ } else if (networkNode.hasInnerNode()) {
+ return InnerNode.fromProtobuf(networkNode.getInnerNode());
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
index e7a45f649b..e4d76cd3db 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.net;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
@@ -229,6 +230,20 @@ public class NodeImpl implements Node {
NetUtils.addSuffix(nodePath));
}
+ public static HddsProtos.NodeTopology toProtobuf(String name, String
location,
+ int level, int cost) {
+
+ HddsProtos.NodeTopology.Builder nodeTopologyBuilder =
+ HddsProtos.NodeTopology.newBuilder()
+ .setName(name)
+ .setLocation(location)
+ .setLevel(level)
+ .setCost(cost);
+
+ HddsProtos.NodeTopology nodeTopology = nodeTopologyBuilder.build();
+ return nodeTopology;
+ }
+
@Override
public boolean equals(Object to) {
if (to == null) {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
index eecd798767..fb37b214ca 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
@@ -62,6 +62,14 @@ public final class NodeSchemaManager {
String schemaFile = conf.get(
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
+ loadSchemaFile(schemaFile);
+ }
+
+ public void init(String schemaFile) {
+ loadSchemaFile(schemaFile);
+ }
+
+ private void loadSchemaFile(String schemaFile) {
NodeSchemaLoadResult result;
try {
result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0080686575..c7867ffdcb 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -672,6 +672,11 @@ public final class OzoneConfigKeys {
public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY =
"hdds.scmclient.failover.max.retry";
+ public static final String
+ OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION =
+ "ozone.om.network.topology.refresh.duration";
+ public static final String
+ OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT = "1h";
/**
* There is no need to instantiate this class.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ee0aa4514a..fc873f20af 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3791,6 +3791,14 @@
<description>Wait duration before which close container
is send to DN.</description>
</property>
+ <property>
+ <name>ozone.om.network.topology.refresh.duration</name>
+ <value>1h</value>
+ <tag>SCM, OZONE, OM</tag>
+ <description>The duration at which we periodically fetch the updated
network
+ topology cluster tree from SCM.
+ </description>
+ </property>
<property>
<name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
<value>1024</value>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
new file mode 100644
index 0000000000..2e42df9573
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT;
+
+/**
+ * This client implements a background thread which periodically checks and
+ * gets the latest network topology cluster tree from SCM.
+ */
+public class ScmTopologyClient {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ScmTopologyClient.class);
+
+ private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+ private final AtomicReference<InnerNode> cache = new AtomicReference<>();
+ private ScheduledExecutorService executorService;
+
+ public ScmTopologyClient(
+ ScmBlockLocationProtocol scmBlockLocationProtocol) {
+ this.scmBlockLocationProtocol = scmBlockLocationProtocol;
+ }
+
+ public InnerNode getClusterTree() {
+ return requireNonNull(cache.get(),
+ "ScmBlockLocationClient must have been initialized already.");
+ }
+
+ public void start(ConfigurationSource conf) throws IOException {
+ final InnerNode initialTopology =
+ scmBlockLocationProtocol.getNetworkTopology();
+ LOG.info("Initial network topology fetched from SCM: {}.",
+ initialTopology);
+ cache.set(initialTopology);
+ scheduleNetworkTopologyPoller(conf, Instant.now());
+ }
+
+ public void stop() {
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while shutting down executor service.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void scheduleNetworkTopologyPoller(ConfigurationSource conf,
+ Instant initialInvocation) {
+ Duration refreshDuration = parseRefreshDuration(conf);
+ Instant nextRefresh = initialInvocation.plus(refreshDuration);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("NetworkTopologyPoller")
+ .setDaemon(true)
+ .build();
+ executorService = Executors.newScheduledThreadPool(1, threadFactory);
+ Duration initialDelay = Duration.between(Instant.now(), nextRefresh);
+
+ LOG.debug("Scheduling NetworkTopologyPoller with an initial delay of {}.",
+ initialDelay);
+ executorService.scheduleAtFixedRate(() -> checkAndRefresh(),
+ initialDelay.toMillis(), refreshDuration.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public static Duration parseRefreshDuration(ConfigurationSource conf) {
+ long refreshDurationInMs = conf.getTimeDuration(
+ OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION,
+ OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ return Duration.ofMillis(refreshDurationInMs);
+ }
+
+ private synchronized void checkAndRefresh() {
+ InnerNode current = cache.get();
+ try {
+ InnerNode newTopology = scmBlockLocationProtocol.getNetworkTopology();
+ if (!newTopology.equals(current)) {
+ cache.set(newTopology);
+ LOG.info("Updated network topology cluster tree fetched from " +
+ "SCM: {}.", newTopology);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Error fetching updated network topology cluster tree from SCM", e);
+ }
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
new file mode 100644
index 0000000000..8dc9cb3cca
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Freon related helper classes used for load testing.
+ */
+
+/**
+ * Contains SCM client related classes.
+ */
+package org.apache.hadoop.hdds.scm.client;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index ef2585488f..8c84af859b 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@@ -138,4 +139,11 @@ public interface ScmBlockLocationProtocol extends
Closeable {
*/
List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException;
+
+ /**
+ * Retrieves the hierarchical cluster tree representing the network topology.
+ * @return the root node of the network topology cluster tree.
+ * @throws IOException
+ */
+ InnerNode getNetworkTopology() throws IOException;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 2e72496999..1f114304cc 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@ import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
@@ -49,6 +52,9 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
@@ -328,6 +334,43 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
return results;
}
+ @Override
+ public InnerNode getNetworkTopology() throws IOException {
+ GetClusterTreeRequestProto request =
+ GetClusterTreeRequestProto.newBuilder().build();
+ SCMBlockLocationRequest wrapper =
createSCMBlockRequest(Type.GetClusterTree)
+ .setGetClusterTreeRequest(request)
+ .build();
+
+ final SCMBlockLocationResponse wrappedResponse =
+ handleError(submitRequest(wrapper));
+ GetClusterTreeResponseProto resp =
+ wrappedResponse.getGetClusterTreeResponse();
+
+ return (InnerNode) setParent(
+ InnerNodeImpl.fromProtobuf(resp.getClusterTree()));
+ }
+
+ /**
+ * Sets the parent field for the clusterTree nodes recursively.
+ *
+ * @param node cluster tree without parents set.
+ * @return updated cluster tree with parents set.
+ */
+ private Node setParent(Node node) {
+ if (node instanceof InnerNodeImpl) {
+ InnerNodeImpl innerNode = (InnerNodeImpl) node;
+ if (innerNode.getChildrenMap() != null) {
+ for (Map.Entry<String, Node> child : innerNode.getChildrenMap()
+ .entrySet()) {
+ child.getValue().setParent(innerNode);
+ setParent(child.getValue());
+ }
+ }
+ }
+ return node;
+ }
+
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 3f346300b3..4058453123 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -47,6 +47,7 @@ message DatanodeDetailsProto {
optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch
when the OpState should expire
// TODO(runzhiwang): when uuid is gone, specify 1 as the index of uuid128
and mark as required
optional UUID uuid128 = 100; // UUID with 128 bits assigned to the
Datanode.
+ optional uint32 level = 101;
}
/**
@@ -497,3 +498,26 @@ message CompactionLogEntryProto {
repeated CompactionFileInfoProto outputFileIntoList = 4;
optional string compactionReason = 5;
}
+
+message NodeTopology {
+ optional string name = 1;
+ optional string location = 2;
+ optional uint32 cost = 3;
+ optional uint32 level = 4;
+}
+
+message NetworkNode {
+ optional DatanodeDetailsProto datanodeDetails = 1;
+ optional InnerNode innerNode = 3;
+}
+
+message ChildrenMap {
+ optional string networkName = 1;
+ optional NetworkNode networkNode = 2;
+}
+
+message InnerNode {
+ optional NodeTopology nodeTopology = 1;
+ optional uint32 numOfLeaves = 2;
+ repeated ChildrenMap childrenMap = 3;
+}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 307c23a562..3d281975f2 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -30,7 +30,6 @@ package hadoop.hdds.block;
import "hdds.proto";
-
// SCM Block protocol
enum Type {
@@ -39,6 +38,7 @@ enum Type {
GetScmInfo = 13;
SortDatanodes = 14;
AddScm = 15;
+ GetClusterTree = 16;
}
message SCMBlockLocationRequest {
@@ -56,6 +56,7 @@ message SCMBlockLocationRequest {
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
optional SortDatanodesRequestProto sortDatanodesRequest = 14;
optional hadoop.hdds.AddScmRequestProto addScmRequestProto = 15;
+ optional GetClusterTreeRequestProto getClusterTreeRequest = 16;
}
message SCMBlockLocationResponse {
@@ -80,6 +81,7 @@ message SCMBlockLocationResponse {
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
optional SortDatanodesResponseProto sortDatanodesResponse = 14;
optional hadoop.hdds.AddScmResponseProto addScmResponse = 15;
+ optional GetClusterTreeResponseProto getClusterTreeResponse = 16;
}
/**
@@ -230,6 +232,13 @@ message SortDatanodesResponseProto{
repeated DatanodeDetailsProto node = 1;
}
+message GetClusterTreeRequestProto {
+}
+
+message GetClusterTreeResponseProto {
+ required InnerNode clusterTree = 1;
+}
+
/**
* Protocol used from OzoneManager to StorageContainerManager.
* See request and response messages for details of the RPC calls.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 7893e90812..ab296fc52b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -354,5 +354,4 @@ public class DatanodeInfo extends DatanodeDetails {
public boolean equals(Object obj) {
return super.equals(obj);
}
-
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 7b1d6dd27d..3aff2f456e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -261,5 +261,4 @@ public class NodeStatus implements Comparable<NodeStatus> {
}
return order;
}
-
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 0914cdd90b..e77e2aebb3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -32,6 +32,7 @@ import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
@@ -43,6 +44,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.RatisUtil;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -159,6 +161,10 @@ public final class
ScmBlockLocationProtocolServerSideTranslatorPB
request.getSortDatanodesRequest(), request.getVersion()
));
break;
+ case GetClusterTree:
+ response.setGetClusterTreeResponse(
+ getClusterTree(request.getVersion()));
+ break;
default:
// Should never happen
throw new IOException("Unknown Operation " + request.getCmdType() +
@@ -276,4 +282,13 @@ public final class
ScmBlockLocationProtocolServerSideTranslatorPB
throw new ServiceException(ex);
}
}
+
+ public GetClusterTreeResponseProto getClusterTree(int clientVersion)
+ throws IOException {
+ GetClusterTreeResponseProto.Builder resp =
+ GetClusterTreeResponseProto.newBuilder();
+ InnerNode clusterTree = impl.getNetworkTopology();
+ resp.setClusterTree(clusterTree.toProtobuf(clientVersion).getInnerNode());
+ return resp.build();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 0747f04584..79002e27a2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -74,6 +74,7 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_K
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_HANDLER_COUNT_KEY;
import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.IO_EXCEPTION;
import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -417,6 +418,11 @@ public class SCMBlockProtocolServer implements
return null;
}
+ @Override
+ public InnerNode getNetworkTopology() {
+ return (InnerNode) scm.getClusterMap().getNode(ROOT);
+ }
+
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index a82a1a8be7..77970ad447 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -31,6 +31,7 @@ import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -46,6 +47,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -312,6 +314,8 @@ public final class TestDelegationToken {
try {
// Start OM
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmTopologyClient(new ScmTopologyClient(
+ new ScmBlockLocationTestingClient(null, null, 0)));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String username = ugi.getUserName();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
new file mode 100644
index 0000000000..463c8b5ae5
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ *
+ * This class is to test the serialization/deserialization of cluster tree
+ * information from SCM.
+ */
+@Timeout(300)
+public class TestGetClusterTreeInformation {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestGetClusterTreeInformation.class);
+ private static int numOfDatanodes = 3;
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static StorageContainerManager scm;
+
+ @BeforeAll
+ public static void init() throws IOException, TimeoutException,
+ InterruptedException {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(numOfDatanodes)
+ .setNumOfOzoneManagers(3)
+ .setNumOfStorageContainerManagers(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ scm = cluster.getStorageContainerManager();
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetClusterTreeInformation() throws IOException {
+ SCMBlockLocationFailoverProxyProvider failoverProxyProvider =
+ new SCMBlockLocationFailoverProxyProvider(conf);
+ failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+ ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+ new ScmBlockLocationProtocolClientSideTranslatorPB(
+ failoverProxyProvider);
+
+ InnerNode expectedInnerNode = (InnerNode)
scm.getClusterMap().getNode(ROOT);
+ InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology();
+ assertEquals(expectedInnerNode, actualInnerNode);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
new file mode 100644
index 0000000000..cef872597e
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.net.StaticMapping;
+
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_LEVEL;
+import static org.mockito.Mockito.mock;
+
+/**
+ * {@link org.apache.hadoop.hdds.scm.server.TestSCMBlockProtocolServer}
+ * sortDatanodes tests for
+ * {@link org.apache.hadoop.ozone.om.KeyManagerImpl#sortDatanodes(List,
String)}.
+ */
+@Timeout(300)
+public class TestOMSortDatanodes {
+
+ private static OzoneConfiguration config;
+ private static StorageContainerManager scm;
+ private static NodeManager nodeManager;
+ private static KeyManagerImpl keyManager;
+ private static StorageContainerLocationProtocol mockScmContainerClient;
+ private static OzoneManager om;
+ private static File dir;
+ private static final int NODE_COUNT = 10;
+ private static final Map<String, String> EDGE_NODES = ImmutableMap.of(
+ "edge0", "/rack0",
+ "edge1", "/rack1"
+ );
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ config = new OzoneConfiguration();
+ dir = GenericTestUtils.getRandomizedTestDir();
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
+ config.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ StaticMapping.class.getName());
+ config.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true");
+ List<DatanodeDetails> datanodes = new ArrayList<>(NODE_COUNT);
+ List<String> nodeMapping = new ArrayList<>(NODE_COUNT);
+ for (int i = 0; i < NODE_COUNT; i++) {
+ DatanodeDetails dn = randomDatanodeDetails();
+ final String rack = "/rack" + (i % 2);
+ nodeMapping.add(dn.getHostName() + "=" + rack);
+ nodeMapping.add(dn.getIpAddress() + "=" + rack);
+ datanodes.add(dn);
+ }
+ EDGE_NODES.forEach((n, r) -> nodeMapping.add(n + "=" + r));
+ config.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING,
+ String.join(",", nodeMapping));
+
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setSCMHAManager(SCMHAManagerStub.getInstance(true));
+ configurator.setScmContext(SCMContext.emptyContext());
+ scm = HddsTestUtils.getScm(config, configurator);
+ scm.start();
+ scm.exitSafeMode();
+ nodeManager = scm.getScmNodeManager();
+ datanodes.forEach(dn -> nodeManager.register(dn, null, null));
+ mockScmContainerClient =
+ mock(StorageContainerLocationProtocol.class);
+ OmTestManagers omTestManagers
+ = new OmTestManagers(config, scm.getBlockProtocolServer(),
+ mockScmContainerClient);
+ om = omTestManagers.getOzoneManager();
+ keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
+ }
+
+ @AfterAll
+ public static void cleanup() throws Exception {
+ if (scm != null) {
+ scm.stop();
+ scm.join();
+ }
+ if (om != null) {
+ om.stop();
+ }
+ FileUtils.deleteDirectory(dir);
+ }
+
+ @Test
+ public void sortDatanodesRelativeToDatanode() {
+ for (DatanodeDetails dn : nodeManager.getAllNodes()) {
+ assertEquals(ROOT_LEVEL + 2, dn.getLevel());
+ List<DatanodeDetails> sorted =
+ keyManager.sortDatanodes(nodeManager.getAllNodes(), nodeAddress(dn));
+ assertEquals(dn, sorted.get(0),
+ "Source node should be sorted very first");
+ assertRackOrder(dn.getNetworkLocation(), sorted);
+ }
+ }
+
+ @Test
+ public void sortDatanodesRelativeToNonDatanode() {
+ for (Map.Entry<String, String> entry : EDGE_NODES.entrySet()) {
+ assertRackOrder(entry.getValue(),
+ keyManager.sortDatanodes(nodeManager.getAllNodes(), entry.getKey()));
+ }
+ }
+
+ @Test
+ public void testSortDatanodes() {
+ List<DatanodeDetails> nodes = nodeManager.getAllNodes();
+
+ // sort normal datanodes
+ String client;
+ client = nodeManager.getAllNodes().get(0).getIpAddress();
+ List<DatanodeDetails> datanodeDetails =
+ keyManager.sortDatanodes(nodes, client);
+ assertEquals(NODE_COUNT, datanodeDetails.size());
+
+ // illegal client 1
+ client += "X";
+ datanodeDetails = keyManager.sortDatanodes(nodes, client);
+ assertEquals(NODE_COUNT, datanodeDetails.size());
+
+ // illegal client 2
+ client = "/default-rack";
+ datanodeDetails = keyManager.sortDatanodes(nodes, client);
+ assertEquals(NODE_COUNT, datanodeDetails.size());
+ }
+
+ private static void assertRackOrder(String rack, List<DatanodeDetails> list)
{
+ int size = list.size();
+ for (int i = 0; i < size / 2; i++) {
+ assertEquals(rack, list.get(i).getNetworkLocation(),
+ "Nodes in the same rack should be sorted first");
+ }
+ for (int i = size / 2; i < size; i++) {
+ assertNotEquals(rack, list.get(i).getNetworkLocation(),
+ "Nodes in the other rack should be sorted last");
+ }
+ }
+
+ private String nodeAddress(DatanodeDetails dn) {
+ boolean useHostname = config.getBoolean(
+ DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ return useHostname ? dn.getHostName() : dn.getIpAddress();
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index bc49d176da..d183ed8229 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -50,10 +50,12 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -87,6 +89,7 @@ import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -208,6 +211,7 @@ final class TestSecureOzoneCluster {
private File testUserKeytab;
private String testUserPrincipal;
private StorageContainerManager scm;
+ private ScmBlockLocationProtocol scmBlockClient;
private OzoneManager om;
private HddsProtos.OzoneManagerDetailsProto omInfo;
private String host;
@@ -264,6 +268,7 @@ final class TestSecureOzoneCluster {
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
+ scmBlockClient = new ScmBlockLocationTestingClient(null, null, 0);
startMiniKdc();
setSecureConfig();
@@ -609,6 +614,7 @@ final class TestSecureOzoneCluster {
setupOm(conf);
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.start();
} catch (Exception ex) {
// Expects timeout failure from scmClient in om but om user login via
@@ -676,6 +682,7 @@ final class TestSecureOzoneCluster {
setupOm(conf);
OzoneManager.setTestSecureOmFlag(true);
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -763,6 +770,7 @@ final class TestSecureOzoneCluster {
setupOm(conf);
// Start OM
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String username = ugi.getUserName();
@@ -999,6 +1007,7 @@ final class TestSecureOzoneCluster {
// create Ozone Manager instance, it will start the monitor task
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
om = OzoneManager.createOm(conf);
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.setCertClient(client);
// check after renew, client will have the new cert ID
@@ -1164,6 +1173,7 @@ final class TestSecureOzoneCluster {
// create Ozone Manager instance, it will start the monitor task
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
om = OzoneManager.createOm(conf);
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.setCertClient(mockClient);
// check error message during renew
@@ -1202,6 +1212,7 @@ final class TestSecureOzoneCluster {
String omCertId1 = omCert.getSerialNumber().toString();
// Start OM
om.setCertClient(certClient);
+ om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
om.start();
GenericTestUtils.waitFor(() -> om.isLeaderReady(), 100, 10000);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 50ff9c36a0..2ae69dc3c9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -50,6 +50,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -162,6 +165,9 @@ public class TestOmContainerLocationCache {
mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class);
mockScmContainerClient =
mock(StorageContainerLocationProtocol.class);
+ InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+ when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
+ factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
OmTestManagers omTestManagers = new OmTestManagers(conf,
mockScmBlockLocationProtocol, mockScmContainerClient);
@@ -247,10 +253,13 @@ public class TestOmContainerLocationCache {
}
@BeforeEach
- public void beforeEach() {
+ public void beforeEach() throws IOException {
CONTAINER_ID.getAndIncrement();
reset(mockScmBlockLocationProtocol, mockScmContainerClient,
mockDn1Protocol, mockDn2Protocol);
+ InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+ when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
+ factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1));
when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2));
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
index 41f1c14f37..72f1c3374b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Callable;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.ozone.OzoneAcl;
@@ -197,6 +198,8 @@ public class TestOzoneManagerListVolumesSecure {
OzoneManager.setTestSecureOmFlag(true);
om = OzoneManager.createOm(conf);
+ om.setScmTopologyClient(new ScmTopologyClient(
+ new ScmBlockLocationTestingClient(null, null, 0)));
om.setCertClient(new CertificateClientTestImpl(conf));
om.start();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ffe1908c68..d2ca26e3fc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -45,11 +45,15 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -58,6 +62,9 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -96,6 +103,7 @@ import
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -108,6 +116,7 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
@@ -190,6 +199,7 @@ public class KeyManagerImpl implements KeyManager {
private BackgroundService openKeyCleanupService;
private BackgroundService multipartUploadCleanupService;
private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
+ private DNSToSwitchMapping dnsToSwitchMapping;
public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -339,6 +349,16 @@ public class KeyManagerImpl implements KeyManager {
ozoneManager, configuration);
multipartUploadCleanupService.start();
}
+
+ Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+ configuration.getClass(
+ DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ TableMapping.class, DNSToSwitchMapping.class);
+ DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+ dnsToSwitchMappingClass, configuration);
+ dnsToSwitchMapping =
+ ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+ : new CachedDNSToSwitchMapping(newInstance));
}
KeyProviderCryptoExtension getKMSProvider() {
@@ -1844,8 +1864,7 @@ public class KeyManagerImpl implements KeyManager {
return encInfo;
}
- @VisibleForTesting
- void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
+ private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
if (keyInfos != null && clientMachine != null) {
Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new
HashMap<>();
for (OmKeyInfo keyInfo : keyInfos) {
@@ -1865,8 +1884,7 @@ public class KeyManagerImpl implements KeyManager {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}
- sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo,
- uuidList);
+ sortedNodes = sortDatanodes(nodes, clientMachine);
if (sortedNodes != null) {
sortedPipelines.put(uuidSet, sortedNodes);
}
@@ -1882,24 +1900,59 @@ public class KeyManagerImpl implements KeyManager {
}
}
- private List<DatanodeDetails> sortDatanodes(String clientMachine,
- List<DatanodeDetails> nodes, OmKeyInfo keyInfo, List<String> nodeList) {
- List<DatanodeDetails> sortedNodes = null;
+ @VisibleForTesting
+ public List<DatanodeDetails> sortDatanodes(List<DatanodeDetails> nodes,
+ String clientMachine) {
+ final Node client = getClientNode(clientMachine, nodes);
+ return ozoneManager.getClusterMap()
+ .sortByDistanceCost(client, nodes, nodes.size());
+ }
+
+ private Node getClientNode(String clientMachine,
+ List<DatanodeDetails> nodes) {
+ List<DatanodeDetails> matchingNodes = new ArrayList<>();
+ boolean useHostname = ozoneManager.getConfiguration().getBoolean(
+ DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ for (DatanodeDetails node : nodes) {
+ if ((useHostname ? node.getHostName() : node.getIpAddress()).equals(
+ clientMachine)) {
+ matchingNodes.add(node);
+ }
+ }
+ return !matchingNodes.isEmpty() ? matchingNodes.get(0) :
+ getOtherNode(clientMachine);
+ }
+
+ private Node getOtherNode(String clientMachine) {
try {
- sortedNodes = scmClient.getBlockClient()
- .sortDatanodes(nodeList, clientMachine);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes,
- clientMachine, sortedNodes);
+ String clientLocation = resolveNodeLocation(clientMachine);
+ if (clientLocation != null) {
+ Node rack = ozoneManager.getClusterMap().getNode(clientLocation);
+ if (rack instanceof InnerNode) {
+ return new NodeImpl(clientMachine, clientLocation,
+ (InnerNode) rack, rack.getLevel() + 1,
+ NODE_COST_DEFAULT);
+ }
}
- } catch (IOException e) {
- LOG.warn("Unable to sort datanodes based on distance to client, "
- + " volume={}, bucket={}, key={}, client={}, datanodes={}, "
- + " exception={}",
- keyInfo.getVolumeName(), keyInfo.getBucketName(),
- keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage());
+ } catch (Exception e) {
+ LOG.info("Could not resolve client {}: {}",
+ clientMachine, e.getMessage());
+ }
+ return null;
+ }
+
+ private String resolveNodeLocation(String hostname) {
+ List<String> hosts = Collections.singletonList(hostname);
+ List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
+ if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
+ String location = resolvedHosts.get(0);
+ LOG.debug("Node {} resolved to location {}", hostname, location);
+ return location;
+ } else {
+ LOG.debug("Node resolution did not yield any result for {}", hostname);
+ return null;
}
- return sortedNodes;
}
private static List<String> toNodeUuid(Collection<DatanodeDetails> nodes) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b8133e5844..c27ba7836b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -82,7 +82,11 @@ import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
@@ -354,6 +358,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private OzoneBlockTokenSecretManager blockTokenMgr;
private CertificateClient certClient;
private SecretKeySignerClient secretKeyClient;
+ private ScmTopologyClient scmTopologyClient;
private final Text omRpcAddressTxt;
private OzoneConfiguration configuration;
private RPC.Server omRpcServer;
@@ -386,6 +391,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private OzoneManagerHttpServer httpServer;
private final OMStorage omStorage;
private ObjectName omInfoBeanName;
+ private NetworkTopology clusterMap;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
private static final ObjectWriter WRITER =
@@ -603,6 +609,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
final StorageContainerLocationProtocol scmContainerClient =
getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
final ScmBlockLocationProtocol scmBlockClient =
getScmBlockClient(configuration);
+ scmTopologyClient = new ScmTopologyClient(scmBlockClient);
this.scmClient = new ScmClient(scmBlockClient, scmContainerClient,
configuration);
this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(),
@@ -1135,6 +1142,24 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
serviceInfo = new ServiceInfoProvider(secConfig, this, certClient);
}
+ /**
+ * For testing purpose only. This allows setting up ScmBlockLocationClient
+ * without having to fully setup a working cluster.
+ */
+ @VisibleForTesting
+ public void setScmTopologyClient(
+ ScmTopologyClient scmTopologyClient) {
+ this.scmTopologyClient = scmTopologyClient;
+ }
+
+ public NetworkTopology getClusterMap() {
+ InnerNode currentTree = scmTopologyClient.getClusterTree();
+ return new NetworkTopologyImpl(configuration.get(
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
+ currentTree);
+ }
+
/**
* For testing purpose only. This allows testing token in integration test
* without fully setting up a working secure cluster.
@@ -1673,6 +1698,18 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
keyManager.start(configuration);
+ try {
+ scmTopologyClient.start(configuration);
+ } catch (IOException ex) {
+ LOG.error("Unable to initialize network topology schema file. ", ex);
+ throw new UncheckedIOException(ex);
+ }
+
+ clusterMap = new NetworkTopologyImpl(configuration.get(
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
+ scmTopologyClient.getClusterTree());
+
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -2232,6 +2269,11 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
keyManager.stop();
stopSecretManager();
+
+ if (scmTopologyClient != null) {
+ scmTopologyClient.stop();
+ }
+
if (httpServer != null) {
httpServer.stop();
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
index 43d29c1608..edffd5ed74 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
@@ -24,6 +24,7 @@ import
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -105,12 +106,16 @@ public final class OmTestManagers {
keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
.getInternalState(om, "keyManager");
ScmClient scmClient = new ScmClient(scmBlockClient, containerClient, conf);
+ ScmTopologyClient scmTopologyClient =
+ new ScmTopologyClient(scmBlockClient);
HddsWhiteboxTestUtils.setInternalState(om,
"scmClient", scmClient);
HddsWhiteboxTestUtils.setInternalState(keyManager,
"scmClient", scmClient);
HddsWhiteboxTestUtils.setInternalState(keyManager,
"secretManager", mock(OzoneBlockTokenSecretManager.class));
+ HddsWhiteboxTestUtils.setInternalState(om,
+ "scmTopologyClient", scmTopologyClient);
om.start();
waitFor(() -> om.getOmRatisServer().checkLeaderStatus() ==
RaftServerStatus.LEADER_AND_READY,
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 8847a2d51e..8ba5ca779c 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -200,6 +203,14 @@ public class ScmBlockLocationTestingClient implements
ScmBlockLocationProtocol {
return null;
}
+ @Override
+ public InnerNode getNetworkTopology() {
+ InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+ InnerNode clusterTree = factory.newInnerNode("", "", null,
+ NetConstants.ROOT_LEVEL, 1);
+ return clusterTree;
+ }
+
/**
* Return the number of blocks puesdo deleted by this testing client.
*/
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 278d96023c..5e2e27e0c1 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -23,12 +23,10 @@ import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,6 +42,9 @@ import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -79,14 +80,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static java.util.Comparator.comparing;
-import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -124,6 +120,9 @@ class TestKeyManagerUnit extends OzoneTestBase {
configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.toString());
containerClient = mock(StorageContainerLocationProtocol.class);
blockClient = mock(ScmBlockLocationProtocol.class);
+ InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+ when(blockClient.getNetworkTopology()).thenReturn(
+ factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
OmTestManagers omTestManagers
= new OmTestManagers(configuration, blockClient, containerClient);
@@ -644,9 +643,6 @@ class TestKeyManagerUnit extends OzoneTestBase {
OMRequestTestUtils.addBucketToDB(volume, bucket, metadataManager);
final Pipeline pipeline = MockPipeline.createPipeline(3);
- final List<String> nodes = pipeline.getNodes().stream()
- .map(DatanodeDetails::getUuidString)
- .collect(toList());
Set<Long> containerIDs = new HashSet<>();
List<ContainerWithPipeline> containersWithPipeline = new ArrayList<>();
@@ -696,7 +692,6 @@ class TestKeyManagerUnit extends OzoneTestBase {
assertEquals(10, fileStatusList.size());
verify(containerClient).getContainerWithPipelineBatch(containerIDs);
- verify(blockClient).sortDatanodes(nodes, client);
// call list status the second time, and verify no more calls to
// SCM.
@@ -704,67 +699,4 @@ class TestKeyManagerUnit extends OzoneTestBase {
null, Long.MAX_VALUE, client);
verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
}
-
- @ParameterizedTest
- @ValueSource(strings = {"anyhost", ""})
- public void sortDatanodes(String client) throws Exception {
- // GIVEN
- int pipelineCount = 3;
- int keysPerPipeline = 5;
- OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline];
- List<List<String>> expectedSortDatanodesInvocations = new ArrayList<>();
- Map<Pipeline, List<DatanodeDetails>> expectedSortedNodes = new HashMap<>();
- int ki = 0;
- for (int p = 0; p < pipelineCount; p++) {
- final Pipeline pipeline = MockPipeline.createPipeline(3);
- final List<String> nodes = pipeline.getNodes().stream()
- .map(DatanodeDetails::getUuidString)
- .collect(toList());
- expectedSortDatanodesInvocations.add(nodes);
- final List<DatanodeDetails> sortedNodes = pipeline.getNodes().stream()
- .sorted(comparing(DatanodeDetails::getUuidString))
- .collect(toList());
- expectedSortedNodes.put(pipeline, sortedNodes);
-
- when(blockClient.sortDatanodes(nodes, client))
- .thenReturn(sortedNodes);
-
- for (int i = 1; i <= keysPerPipeline; i++) {
- OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
- .setBlockID(new BlockID(i, 1L))
- .setPipeline(pipeline)
- .setOffset(0)
- .setLength(256000)
- .build();
-
- OmKeyInfo keyInfo = new OmKeyInfo.Builder()
- .setOmKeyLocationInfos(Arrays.asList(
- new OmKeyLocationInfoGroup(0, emptyList()),
- new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo))))
- .build();
- keyInfos[ki++] = keyInfo;
- }
- }
-
- // WHEN
- keyManager.sortDatanodes(client, keyInfos);
-
- // THEN
- // verify all key info locations got updated
- for (OmKeyInfo keyInfo : keyInfos) {
- OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
- assertNotNull(locations);
- for (OmKeyLocationInfo locationInfo : locations.getLocationList()) {
- Pipeline pipeline = locationInfo.getPipeline();
- List<DatanodeDetails> expectedOrder =
expectedSortedNodes.get(pipeline);
- assertEquals(expectedOrder, pipeline.getNodesInOrder());
- }
- }
-
- // expect one invocation per pipeline
- for (List<String> nodes : expectedSortDatanodesInvocations) {
- verify(blockClient).sortDatanodes(nodes, client);
- }
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]