This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 140c5deffb HDDS-9343. Shift sortDatanodes logic to OM (#5391)
140c5deffb is described below

commit 140c5deffb848efc762aacf0cad9aa617c9c5374
Author: tanvipenumudy <[email protected]>
AuthorDate: Thu Mar 7 04:15:08 2024 +0530

    HDDS-9343. Shift sortDatanodes logic to OM (#5391)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  25 +++
 .../org/apache/hadoop/hdds/scm/net/InnerNode.java  |  14 ++
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  | 159 +++++++++++++++++-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  20 ++-
 .../java/org/apache/hadoop/hdds/scm/net/Node.java  |  20 +++
 .../org/apache/hadoop/hdds/scm/net/NodeImpl.java   |  15 ++
 .../hadoop/hdds/scm/net/NodeSchemaManager.java     |   8 +
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../common/src/main/resources/ozone-default.xml    |   8 +
 .../hadoop/hdds/scm/client/ScmTopologyClient.java  | 127 ++++++++++++++
 .../hadoop/hdds/scm/client/package-info.java       |  24 +++
 .../scm/protocol/ScmBlockLocationProtocol.java     |   8 +
 ...lockLocationProtocolClientSideTranslatorPB.java |  43 +++++
 .../interface-client/src/main/proto/hdds.proto     |  24 +++
 .../src/main/proto/ScmServerProtocol.proto         |  11 +-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |   1 -
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    |   1 -
 ...lockLocationProtocolServerSideTranslatorPB.java |  15 ++
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   6 +
 .../apache/hadoop/ozone/TestDelegationToken.java   |   4 +
 .../ozone/TestGetClusterTreeInformation.java       |  87 ++++++++++
 .../apache/hadoop/ozone/TestOMSortDatanodes.java   | 187 +++++++++++++++++++++
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  11 ++
 .../ozone/om/TestOmContainerLocationCache.java     |  11 +-
 .../om/TestOzoneManagerListVolumesSecure.java      |   3 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  91 +++++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  42 +++++
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |   5 +
 .../ozone/om/ScmBlockLocationTestingClient.java    |  11 ++
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  80 +--------
 30 files changed, 959 insertions(+), 107 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 5b6fb6fe9b..a30f8414dc 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -364,6 +364,9 @@ public class DatanodeDetails extends NodeImpl implements
     if (datanodeDetailsProto.hasNetworkLocation()) {
       builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
     }
+    if (datanodeDetailsProto.hasLevel()) {
+      builder.setLevel(datanodeDetailsProto.getLevel());
+    }
     if (datanodeDetailsProto.hasPersistedOpState()) {
       builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState());
     }
@@ -456,6 +459,9 @@ public class DatanodeDetails extends NodeImpl implements
     if (!Strings.isNullOrEmpty(getNetworkLocation())) {
       builder.setNetworkLocation(getNetworkLocation());
     }
+    if (getLevel() > 0) {
+      builder.setLevel(getLevel());
+    }
     if (persistedOpState != null) {
       builder.setPersistedOpState(persistedOpState);
     }
@@ -585,6 +591,7 @@ public class DatanodeDetails extends NodeImpl implements
     private String hostName;
     private String networkName;
     private String networkLocation;
+    private int level;
     private List<Port> ports;
     private String certSerialId;
     private String version;
@@ -616,6 +623,7 @@ public class DatanodeDetails extends NodeImpl implements
       this.hostName = details.getHostName();
       this.networkName = details.getNetworkName();
       this.networkLocation = details.getNetworkLocation();
+      this.level = details.getLevel();
       this.ports = details.getPorts();
       this.certSerialId = details.getCertSerialId();
       this.version = details.getVersion();
@@ -683,6 +691,11 @@ public class DatanodeDetails extends NodeImpl implements
       return this;
     }
 
+    public Builder setLevel(int level) {
+      this.level = level;
+      return this;
+    }
+
     /**
      * Adds a DataNode Port.
      *
@@ -807,6 +820,9 @@ public class DatanodeDetails extends NodeImpl implements
       if (networkName != null) {
         dn.setNetworkName(networkName);
       }
+      if (level > 0) {
+        dn.setLevel(level);
+      }
       return dn;
     }
   }
@@ -1011,4 +1027,13 @@ public class DatanodeDetails extends NodeImpl implements
   public void setBuildDate(String date) {
     this.buildDate = date;
   }
+
+  @Override
+  public HddsProtos.NetworkNode toProtobuf(
+      int clientVersion) {
+    HddsProtos.NetworkNode networkNode =
+        HddsProtos.NetworkNode.newBuilder()
+            .setDatanodeDetails(toProtoBuilder(clientVersion).build()).build();
+    return networkNode;
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
index c87d826d25..6074e7da0a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.net;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
 /**
  * The interface defines an inner node in a network topology.
  * An inner node represents network topology entities, such as data center,
@@ -89,4 +91,16 @@ public interface InnerNode extends Node {
    */
   Node getLeaf(int leafIndex, List<String> excludedScopes,
       Collection<Node> excludedNodes, int ancestorGen);
+
+  @Override
+  HddsProtos.NetworkNode toProtobuf(int clientVersion);
+
+  boolean equals(Object o);
+
+  int hashCode();
+
+  static InnerNode fromProtobuf(
+      HddsProtos.InnerNode innerNode) {
+    return InnerNodeImpl.fromProtobuf(innerNode);
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
index f2648f3d29..332dddac25 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,10 +48,10 @@ public class InnerNodeImpl extends NodeImpl implements 
InnerNode {
     }
   }
 
-  static final Factory FACTORY = new Factory();
+  public static final Factory FACTORY = new Factory();
   // a map of node's network name to Node for quick search and keep
   // the insert order
-  private final HashMap<String, Node> childrenMap =
+  private HashMap<String, Node> childrenMap =
       new LinkedHashMap<String, Node>();
   // number of descendant leaves under this node
   private int numOfLeaves;
@@ -66,6 +67,76 @@ public class InnerNodeImpl extends NodeImpl implements 
InnerNode {
     super(name, location, parent, level, cost);
   }
 
+  /**
+   * Construct an InnerNode from its name, network location, level, cost,
+   * childrenMap and number of leaves. This constructor is used as part of
+   * protobuf deserialization.
+   */
+  protected InnerNodeImpl(String name, String location, int level, int cost,
+                          HashMap<String, Node> childrenMap, int numOfLeaves) {
+    super(name, location, null, level, cost);
+    this.childrenMap = childrenMap;
+    this.numOfLeaves = numOfLeaves;
+  }
+
+  /**
+   * InnerNodeImpl Builder to help construct an InnerNodeImpl object from
+   * protobuf objects.
+   */
+  public static class Builder {
+    private String name;
+    private String location;
+    private int cost;
+    private int level;
+    private HashMap<String, Node> childrenMap = new LinkedHashMap<>();
+    private int numOfLeaves;
+
+    public Builder setName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder setLocation(String location) {
+      this.location = location;
+      return this;
+    }
+
+    public Builder setCost(int cost) {
+      this.cost = cost;
+      return this;
+    }
+
+    public Builder setLevel(int level) {
+      this.level = level;
+      return this;
+    }
+
+    public Builder setChildrenMap(
+        List<HddsProtos.ChildrenMap> childrenMapList) {
+      HashMap<String, Node> newChildrenMap = new LinkedHashMap<>();
+      for (HddsProtos.ChildrenMap childrenMapProto :
+          childrenMapList) {
+        String networkName = childrenMapProto.hasNetworkName() ?
+            childrenMapProto.getNetworkName() : null;
+        Node node = childrenMapProto.hasNetworkNode() ?
+            Node.fromProtobuf(childrenMapProto.getNetworkNode()) : null;
+        newChildrenMap.put(networkName, node);
+      }
+      this.childrenMap = newChildrenMap;
+      return this;
+    }
+
+    public Builder setNumOfLeaves(int numOfLeaves) {
+      this.numOfLeaves = numOfLeaves;
+      return this;
+    }
+
+    public InnerNodeImpl build() {
+      return new InnerNodeImpl(name, location, level, cost, childrenMap,
+          numOfLeaves);
+    }
+  }
+
   /** @return the number of children this node has */
   private int getNumOfChildren() {
     return childrenMap.size();
@@ -77,6 +148,11 @@ public class InnerNodeImpl extends NodeImpl implements 
InnerNode {
     return numOfLeaves;
   }
 
+  /** @return a map of node's network name to Node. */
+  public HashMap<String, Node> getChildrenMap() {
+    return childrenMap;
+  }
+
   /**
    * @return number of its all nodes at level <i>level</i>. Here level is a
    * relative level. If level is 1, means node itself. If level is 2, means its
@@ -390,14 +466,83 @@ public class InnerNodeImpl extends NodeImpl implements 
InnerNode {
   }
 
   @Override
-  public boolean equals(Object to) {
-    if (to == null) {
-      return false;
+  public HddsProtos.NetworkNode toProtobuf(
+      int clientVersion) {
+
+    HddsProtos.InnerNode.Builder innerNode =
+        HddsProtos.InnerNode.newBuilder()
+            .setNumOfLeaves(numOfLeaves)
+            .setNodeTopology(
+                NodeImpl.toProtobuf(getNetworkName(), getNetworkLocation(),
+                    getLevel(), getCost()));
+
+    if (childrenMap != null && !childrenMap.isEmpty()) {
+      for (Map.Entry<String, Node> entry : childrenMap.entrySet()) {
+        if (entry.getValue() != null) {
+          HddsProtos.ChildrenMap childrenMapProto =
+              HddsProtos.ChildrenMap.newBuilder()
+                  .setNetworkName(entry.getKey())
+                  .setNetworkNode(entry.getValue().toProtobuf(clientVersion))
+                  .build();
+          innerNode.addChildrenMap(childrenMapProto);
+        }
+      }
+    }
+    innerNode.build();
+
+    HddsProtos.NetworkNode networkNode =
+        HddsProtos.NetworkNode.newBuilder()
+            .setInnerNode(innerNode).build();
+
+    return networkNode;
+  }
+
+  public static InnerNode fromProtobuf(HddsProtos.InnerNode innerNode) {
+    InnerNodeImpl.Builder builder = new InnerNodeImpl.Builder();
+
+    if (innerNode.hasNodeTopology()) {
+      HddsProtos.NodeTopology nodeTopology = innerNode.getNodeTopology();
+
+      if (nodeTopology.hasName()) {
+        builder.setName(nodeTopology.getName());
+      }
+      if (nodeTopology.hasLocation()) {
+        builder.setLocation(nodeTopology.getLocation());
+      }
+      if (nodeTopology.hasLevel()) {
+        builder.setLevel(nodeTopology.getLevel());
+      }
+      if (nodeTopology.hasCost()) {
+        builder.setCost(nodeTopology.getCost());
+      }
+    }
+
+    if (!innerNode.getChildrenMapList().isEmpty()) {
+      builder.setChildrenMap(innerNode.getChildrenMapList());
+    }
+    if (innerNode.hasNumOfLeaves()) {
+      builder.setNumOfLeaves(innerNode.getNumOfLeaves());
     }
-    if (this == to) {
+
+    return builder.build();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
     }
-    return this.toString().equals(to.toString());
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    InnerNodeImpl innerNode = (InnerNodeImpl) o;
+    return this.getNetworkName().equals(innerNode.getNetworkName()) &&
+        this.getNetworkLocation().equals(innerNode.getNetworkLocation()) &&
+        this.getLevel() == innerNode.getLevel() &&
+        this.getCost() == innerNode.getCost() &&
+        this.numOfLeaves == innerNode.numOfLeaves &&
+        this.childrenMap.size() == innerNode.childrenMap.size() &&
+        this.childrenMap.equals(innerNode.childrenMap);
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 2dc86c1b68..f6f013259c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -75,6 +75,15 @@ public class NetworkTopologyImpl implements NetworkTopology {
         schemaManager.getCost(NetConstants.ROOT_LEVEL));
   }
 
+  public NetworkTopologyImpl(String schemaFile, InnerNode clusterTree) {
+    schemaManager = NodeSchemaManager.getInstance();
+    schemaManager.init(schemaFile);
+    maxLevel = schemaManager.getMaxLevel();
+    shuffleOperation = Collections::shuffle;
+    factory = InnerNodeImpl.FACTORY;
+    this.clusterTree = clusterTree;
+  }
+
   @VisibleForTesting
   public NetworkTopologyImpl(NodeSchemaManager manager,
                              Consumer<List<? extends Node>> shuffleOperation) {
@@ -726,8 +735,13 @@ public class NetworkTopologyImpl implements 
NetworkTopology {
     int cost = 0;
     netlock.readLock().lock();
     try {
-      if ((node1.getAncestor(level1 - 1) != clusterTree) ||
-          (node2.getAncestor(level2 - 1) != clusterTree)) {
+      Node ancestor1 = node1.getAncestor(level1 - 1);
+      boolean node1Topology = (ancestor1 != null && clusterTree != null &&
+          !ancestor1.equals(clusterTree)) || (ancestor1 != clusterTree);
+      Node ancestor2 = node2.getAncestor(level2 - 1);
+      boolean node2Topology = (ancestor2 != null && clusterTree != null &&
+          !ancestor2.equals(clusterTree)) || (ancestor2 != clusterTree);
+      if (node1Topology || node2Topology) {
         LOG.debug("One of the nodes is outside of network topology");
         return Integer.MAX_VALUE;
       }
@@ -741,7 +755,7 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
         level2--;
         cost += node2 == null ? 0 : node2.getCost();
       }
-      while (node1 != null && node2 != null && node1 != node2) {
+      while (node1 != null && node2 != null && !node1.equals(node2)) {
         node1 = node1.getParent();
         node2 = node2.getParent();
         cost += node1 == null ? 0 : node1.getCost();
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
index 9884888a1d..50f702cce0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdds.scm.net;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
 /**
  * The interface defines a node in a network topology.
  * A node may be a leave representing a data node or an inner
@@ -126,4 +129,21 @@ public interface Node {
    * @return true if this node is under a specific scope
    */
   boolean isDescendant(String nodePath);
+
+  default HddsProtos.NetworkNode toProtobuf(
+      int clientVersion) {
+    return null;
+  }
+
+  static Node fromProtobuf(
+      HddsProtos.NetworkNode networkNode) {
+    if (networkNode.hasDatanodeDetails()) {
+      return DatanodeDetails.getFromProtoBuf(
+          networkNode.getDatanodeDetails());
+    } else if (networkNode.hasInnerNode()) {
+      return InnerNode.fromProtobuf(networkNode.getInnerNode());
+    } else {
+      return null;
+    }
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
index e7a45f649b..e4d76cd3db 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.net;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
@@ -229,6 +230,20 @@ public class NodeImpl implements Node {
         NetUtils.addSuffix(nodePath));
   }
 
+  public static HddsProtos.NodeTopology toProtobuf(String name, String 
location,
+      int level, int cost) {
+
+    HddsProtos.NodeTopology.Builder nodeTopologyBuilder =
+        HddsProtos.NodeTopology.newBuilder()
+            .setName(name)
+            .setLocation(location)
+            .setLevel(level)
+            .setCost(cost);
+
+    HddsProtos.NodeTopology nodeTopology = nodeTopologyBuilder.build();
+    return nodeTopology;
+  }
+
   @Override
   public boolean equals(Object to) {
     if (to == null) {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
index eecd798767..fb37b214ca 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
@@ -62,6 +62,14 @@ public final class NodeSchemaManager {
     String schemaFile = conf.get(
         ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
         ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
+    loadSchemaFile(schemaFile);
+  }
+
+  public void init(String schemaFile) {
+    loadSchemaFile(schemaFile);
+  }
+
+  private void loadSchemaFile(String schemaFile) {
     NodeSchemaLoadResult result;
     try {
       result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0080686575..c7867ffdcb 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -672,6 +672,11 @@ public final class OzoneConfigKeys {
   public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY =
       "hdds.scmclient.failover.max.retry";
 
+  public static final String
+      OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION =
+      "ozone.om.network.topology.refresh.duration";
+  public static final String
+      OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT = "1h";
 
   /**
    * There is no need to instantiate this class.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ee0aa4514a..fc873f20af 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3791,6 +3791,14 @@
     <description>Wait duration before which close container
       is send to DN.</description>
   </property>
+  <property>
+    <name>ozone.om.network.topology.refresh.duration</name>
+    <value>1h</value>
+    <tag>SCM, OZONE, OM</tag>
+    <description>The duration at which we periodically fetch the updated 
network
+      topology cluster tree from SCM.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
     <value>1024</value>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
new file mode 100644
index 0000000000..2e42df9573
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT;
+
+/**
+ * This client implements a background thread which periodically checks and
+ * gets the latest network topology cluster tree from SCM.
+ */
+public class ScmTopologyClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ScmTopologyClient.class);
+
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private final AtomicReference<InnerNode> cache = new AtomicReference<>();
+  private ScheduledExecutorService executorService;
+
+  public ScmTopologyClient(
+      ScmBlockLocationProtocol scmBlockLocationProtocol) {
+    this.scmBlockLocationProtocol = scmBlockLocationProtocol;
+  }
+
+  public InnerNode getClusterTree() {
+    return requireNonNull(cache.get(),
+        "ScmBlockLocationClient must have been initialized already.");
+  }
+
+  public void start(ConfigurationSource conf) throws IOException {
+    final InnerNode initialTopology =
+        scmBlockLocationProtocol.getNetworkTopology();
+    LOG.info("Initial network topology fetched from SCM: {}.",
+        initialTopology);
+    cache.set(initialTopology);
+    scheduleNetworkTopologyPoller(conf, Instant.now());
+  }
+
+  public void stop() {
+    if (executorService != null) {
+      executorService.shutdown();
+      try {
+        if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+          executorService.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private void scheduleNetworkTopologyPoller(ConfigurationSource conf,
+                                             Instant initialInvocation) {
+    Duration refreshDuration = parseRefreshDuration(conf);
+    Instant nextRefresh = initialInvocation.plus(refreshDuration);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("NetworkTopologyPoller")
+        .setDaemon(true)
+        .build();
+    executorService = Executors.newScheduledThreadPool(1, threadFactory);
+    Duration initialDelay = Duration.between(Instant.now(), nextRefresh);
+
+    LOG.debug("Scheduling NetworkTopologyPoller with an initial delay of {}.",
+        initialDelay);
+    executorService.scheduleAtFixedRate(() -> checkAndRefresh(),
+        initialDelay.toMillis(), refreshDuration.toMillis(),
+        TimeUnit.MILLISECONDS);
+  }
+
+  public static Duration parseRefreshDuration(ConfigurationSource conf) {
+    long refreshDurationInMs = conf.getTimeDuration(
+        OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION,
+        OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    return Duration.ofMillis(refreshDurationInMs);
+  }
+
+  private synchronized void checkAndRefresh() {
+    InnerNode current = cache.get();
+    try {
+      InnerNode newTopology = scmBlockLocationProtocol.getNetworkTopology();
+      if (!newTopology.equals(current)) {
+        cache.set(newTopology);
+        LOG.info("Updated network topology cluster tree fetched from " +
+            "SCM: {}.", newTopology);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          "Error fetching updated network topology cluster tree from SCM", e);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
new file mode 100644
index 0000000000..8dc9cb3cca
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Freon related helper classes used for load testing.
+ */
+
+/**
+ * Contains SCM client related classes.
+ */
+package org.apache.hadoop.hdds.scm.client;
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index ef2585488f..8c84af859b 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@@ -138,4 +139,11 @@ public interface ScmBlockLocationProtocol extends 
Closeable {
    */
   List<DatanodeDetails> sortDatanodes(List<String> nodes,
       String clientMachine) throws IOException;
+
+  /**
+   * Retrieves the hierarchical cluster tree representing the network topology.
+   * @return the root node of the network topology cluster tree.
+   * @throws IOException
+   */
+  InnerNode getNetworkTopology() throws IOException;
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 2e72496999..1f114304cc 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .SortDatanodesRequestProto;
@@ -49,6 +52,9 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
@@ -328,6 +334,43 @@ public final class 
ScmBlockLocationProtocolClientSideTranslatorPB
     return results;
   }
 
+  @Override
+  public InnerNode getNetworkTopology() throws IOException {
+    GetClusterTreeRequestProto request =
+        GetClusterTreeRequestProto.newBuilder().build();
+    SCMBlockLocationRequest wrapper = 
createSCMBlockRequest(Type.GetClusterTree)
+        .setGetClusterTreeRequest(request)
+        .build();
+
+    final SCMBlockLocationResponse wrappedResponse =
+        handleError(submitRequest(wrapper));
+    GetClusterTreeResponseProto resp =
+        wrappedResponse.getGetClusterTreeResponse();
+
+    return (InnerNode) setParent(
+        InnerNodeImpl.fromProtobuf(resp.getClusterTree()));
+  }
+
+  /**
+   * Sets the parent field for the clusterTree nodes recursively.
+   *
+   * @param node cluster tree without parents set.
+   * @return updated cluster tree with parents set.
+   */
+  private Node setParent(Node node) {
+    if (node instanceof InnerNodeImpl) {
+      InnerNodeImpl innerNode = (InnerNodeImpl) node;
+      if (innerNode.getChildrenMap() != null) {
+        for (Map.Entry<String, Node> child : innerNode.getChildrenMap()
+            .entrySet()) {
+          child.getValue().setParent(innerNode);
+          setParent(child.getValue());
+        }
+      }
+    }
+    return node;
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 3f346300b3..4058453123 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -47,6 +47,7 @@ message DatanodeDetailsProto {
     optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch 
when the OpState should expire
     // TODO(runzhiwang): when uuid is gone, specify 1 as the index of uuid128 
and mark as required
     optional UUID uuid128 = 100; // UUID with 128 bits assigned to the 
Datanode.
+    optional uint32 level = 101;
 }
 
 /**
@@ -497,3 +498,26 @@ message CompactionLogEntryProto {
     repeated CompactionFileInfoProto outputFileIntoList = 4;
     optional string compactionReason = 5;
 }
+
+message NodeTopology {
+    optional string name = 1;
+    optional string location = 2;
+    optional uint32 cost = 3;
+    optional uint32 level = 4;
+}
+
+message NetworkNode {
+    optional DatanodeDetailsProto datanodeDetails = 1;
+    optional InnerNode innerNode = 3;
+}
+
+message ChildrenMap {
+    optional string networkName = 1;
+    optional NetworkNode networkNode = 2;
+}
+
+message InnerNode {
+    optional NodeTopology nodeTopology = 1;
+    optional uint32 numOfLeaves = 2;
+    repeated ChildrenMap childrenMap = 3;
+}
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 307c23a562..3d281975f2 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -30,7 +30,6 @@ package hadoop.hdds.block;
 
 import "hdds.proto";
 
-
 // SCM Block protocol
 
 enum Type {
@@ -39,6 +38,7 @@ enum Type {
   GetScmInfo         = 13;
   SortDatanodes      = 14;
   AddScm             = 15;
+  GetClusterTree     = 16;
 }
 
 message SCMBlockLocationRequest {
@@ -56,6 +56,7 @@ message SCMBlockLocationRequest {
   optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest         = 13;
   optional SortDatanodesRequestProto          sortDatanodesRequest      = 14;
   optional hadoop.hdds.AddScmRequestProto     addScmRequestProto       = 15;
+  optional GetClusterTreeRequestProto         getClusterTreeRequest     = 16;
 }
 
 message SCMBlockLocationResponse {
@@ -80,6 +81,7 @@ message SCMBlockLocationResponse {
   optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse         = 13;
   optional SortDatanodesResponseProto          sortDatanodesResponse      = 14;
   optional hadoop.hdds.AddScmResponseProto     addScmResponse        = 15;
+  optional GetClusterTreeResponseProto         getClusterTreeResponse     = 16;
 }
 
 /**
@@ -230,6 +232,13 @@ message SortDatanodesResponseProto{
   repeated DatanodeDetailsProto node = 1;
 }
 
+message GetClusterTreeRequestProto {
+}
+
+message GetClusterTreeResponseProto {
+  required InnerNode clusterTree = 1;
+}
+
 /**
  * Protocol used from OzoneManager to StorageContainerManager.
  * See request and response messages for details of the RPC calls.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 7893e90812..ab296fc52b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -354,5 +354,4 @@ public class DatanodeInfo extends DatanodeDetails {
   public boolean equals(Object obj) {
     return super.equals(obj);
   }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 7b1d6dd27d..3aff2f456e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -261,5 +261,4 @@ public class NodeStatus implements Comparable<NodeStatus> {
     }
     return order;
   }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 0914cdd90b..e77e2aebb3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
@@ -43,6 +44,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.RatisUtil;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -159,6 +161,10 @@ public final class 
ScmBlockLocationProtocolServerSideTranslatorPB
             request.getSortDatanodesRequest(), request.getVersion()
         ));
         break;
+      case GetClusterTree:
+        response.setGetClusterTreeResponse(
+            getClusterTree(request.getVersion()));
+        break;
       default:
         // Should never happen
         throw new IOException("Unknown Operation " + request.getCmdType() +
@@ -276,4 +282,13 @@ public final class 
ScmBlockLocationProtocolServerSideTranslatorPB
       throw new ServiceException(ex);
     }
   }
+
+  public GetClusterTreeResponseProto getClusterTree(int clientVersion)
+      throws IOException {
+    GetClusterTreeResponseProto.Builder resp =
+        GetClusterTreeResponseProto.newBuilder();
+    InnerNode clusterTree = impl.getNetworkTopology();
+    resp.setClusterTree(clusterTree.toProtobuf(clientVersion).getInnerNode());
+    return resp.build();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 0747f04584..79002e27a2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -74,6 +74,7 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_K
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_HANDLER_COUNT_KEY;
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.IO_EXCEPTION;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -417,6 +418,11 @@ public class SCMBlockProtocolServer implements
     return null;
   }
 
+  @Override
+  public InnerNode getNetworkTopology() {
+    return (InnerNode) scm.getClusterMap().getNode(ROOT);
+  }
+
   @Override
   public AuditMessage buildAuditMessageForSuccess(
       AuditAction op, Map<String, String> auditMap) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index a82a1a8be7..77970ad447 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -46,6 +47,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
 import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -312,6 +314,8 @@ public final class TestDelegationToken {
     try {
       // Start OM
       om.setCertClient(new CertificateClientTestImpl(conf));
+      om.setScmTopologyClient(new ScmTopologyClient(
+          new ScmBlockLocationTestingClient(null, null, 0)));
       om.start();
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       String username = ugi.getUserName();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
new file mode 100644
index 0000000000..463c8b5ae5
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import 
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ *
+ * This class is to test the serialization/deserialization of cluster tree
+ * information from SCM.
+ */
+@Timeout(300)
+public class TestGetClusterTreeInformation {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestGetClusterTreeInformation.class);
+  private static int numOfDatanodes = 3;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static StorageContainerManager scm;
+
+  @BeforeAll
+  public static void init() throws IOException, TimeoutException,
+      InterruptedException {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(numOfDatanodes)
+        .setNumOfOzoneManagers(3)
+        .setNumOfStorageContainerManagers(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetClusterTreeInformation() throws IOException {
+    SCMBlockLocationFailoverProxyProvider failoverProxyProvider =
+        new SCMBlockLocationFailoverProxyProvider(conf);
+    failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+    ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+        new ScmBlockLocationProtocolClientSideTranslatorPB(
+            failoverProxyProvider);
+
+    InnerNode expectedInnerNode = (InnerNode) 
scm.getClusterMap().getNode(ROOT);
+    InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology();
+    assertEquals(expectedInnerNode, actualInnerNode);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
new file mode 100644
index 0000000000..cef872597e
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.net.StaticMapping;
+
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_LEVEL;
+import static org.mockito.Mockito.mock;
+
+/**
+ * {@link org.apache.hadoop.hdds.scm.server.TestSCMBlockProtocolServer}
+ * sortDatanodes tests for
+ * {@link org.apache.hadoop.ozone.om.KeyManagerImpl#sortDatanodes(List, 
String)}.
+ */
+@Timeout(300)
+public class TestOMSortDatanodes {
+
+  private static OzoneConfiguration config;
+  private static StorageContainerManager scm;
+  private static NodeManager nodeManager;
+  private static KeyManagerImpl keyManager;
+  private static StorageContainerLocationProtocol mockScmContainerClient;
+  private static OzoneManager om;
+  private static File dir;
+  private static final int NODE_COUNT = 10;
+  private static final Map<String, String> EDGE_NODES = ImmutableMap.of(
+      "edge0", "/rack0",
+      "edge1", "/rack1"
+  );
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    config = new OzoneConfiguration();
+    dir = GenericTestUtils.getRandomizedTestDir();
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
+    config.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        StaticMapping.class.getName());
+    config.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true");
+    List<DatanodeDetails> datanodes = new ArrayList<>(NODE_COUNT);
+    List<String> nodeMapping = new ArrayList<>(NODE_COUNT);
+    for (int i = 0; i < NODE_COUNT; i++) {
+      DatanodeDetails dn = randomDatanodeDetails();
+      final String rack = "/rack" + (i % 2);
+      nodeMapping.add(dn.getHostName() + "=" + rack);
+      nodeMapping.add(dn.getIpAddress() + "=" + rack);
+      datanodes.add(dn);
+    }
+    EDGE_NODES.forEach((n, r) -> nodeMapping.add(n + "=" + r));
+    config.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING,
+        String.join(",", nodeMapping));
+
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setSCMHAManager(SCMHAManagerStub.getInstance(true));
+    configurator.setScmContext(SCMContext.emptyContext());
+    scm = HddsTestUtils.getScm(config, configurator);
+    scm.start();
+    scm.exitSafeMode();
+    nodeManager = scm.getScmNodeManager();
+    datanodes.forEach(dn -> nodeManager.register(dn, null, null));
+    mockScmContainerClient =
+        mock(StorageContainerLocationProtocol.class);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(config, scm.getBlockProtocolServer(),
+        mockScmContainerClient);
+    om = omTestManagers.getOzoneManager();
+    keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
+  }
+
+  @AfterAll
+  public static void cleanup() throws Exception {
+    if (scm != null) {
+      scm.stop();
+      scm.join();
+    }
+    if (om != null) {
+      om.stop();
+    }
+    FileUtils.deleteDirectory(dir);
+  }
+
+  @Test
+  public void sortDatanodesRelativeToDatanode() {
+    for (DatanodeDetails dn : nodeManager.getAllNodes()) {
+      assertEquals(ROOT_LEVEL + 2, dn.getLevel());
+      List<DatanodeDetails> sorted =
+          keyManager.sortDatanodes(nodeManager.getAllNodes(), nodeAddress(dn));
+      assertEquals(dn, sorted.get(0),
+          "Source node should be sorted very first");
+      assertRackOrder(dn.getNetworkLocation(), sorted);
+    }
+  }
+
+  @Test
+  public void sortDatanodesRelativeToNonDatanode() {
+    for (Map.Entry<String, String> entry : EDGE_NODES.entrySet()) {
+      assertRackOrder(entry.getValue(),
+          keyManager.sortDatanodes(nodeManager.getAllNodes(), entry.getKey()));
+    }
+  }
+
+  @Test
+  public void testSortDatanodes() {
+    List<DatanodeDetails> nodes = nodeManager.getAllNodes();
+
+    // sort normal datanodes
+    String client;
+    client = nodeManager.getAllNodes().get(0).getIpAddress();
+    List<DatanodeDetails> datanodeDetails =
+        keyManager.sortDatanodes(nodes, client);
+    assertEquals(NODE_COUNT, datanodeDetails.size());
+
+    // illegal client 1
+    client += "X";
+    datanodeDetails = keyManager.sortDatanodes(nodes, client);
+    assertEquals(NODE_COUNT, datanodeDetails.size());
+
+    // illegal client 2
+    client = "/default-rack";
+    datanodeDetails = keyManager.sortDatanodes(nodes, client);
+    assertEquals(NODE_COUNT, datanodeDetails.size());
+  }
+
+  private static void assertRackOrder(String rack, List<DatanodeDetails> list) 
{
+    int size = list.size();
+    for (int i = 0; i < size / 2; i++) {
+      assertEquals(rack, list.get(i).getNetworkLocation(),
+          "Nodes in the same rack should be sorted first");
+    }
+    for (int i = size / 2; i < size; i++) {
+      assertNotEquals(rack, list.get(i).getNetworkLocation(),
+          "Nodes in the other rack should be sorted last");
+    }
+  }
+
+  private String nodeAddress(DatanodeDetails dn) {
+    boolean useHostname = config.getBoolean(
+        DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    return useHostname ? dn.getHostName() : dn.getIpAddress();
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index bc49d176da..d183ed8229 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -50,10 +50,12 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -87,6 +89,7 @@ import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -208,6 +211,7 @@ final class TestSecureOzoneCluster {
   private File testUserKeytab;
   private String testUserPrincipal;
   private StorageContainerManager scm;
+  private ScmBlockLocationProtocol scmBlockClient;
   private OzoneManager om;
   private HddsProtos.OzoneManagerDetailsProto omInfo;
   private String host;
@@ -264,6 +268,7 @@ final class TestSecureOzoneCluster {
       clusterId = UUID.randomUUID().toString();
       scmId = UUID.randomUUID().toString();
       omId = UUID.randomUUID().toString();
+      scmBlockClient = new ScmBlockLocationTestingClient(null, null, 0);
 
       startMiniKdc();
       setSecureConfig();
@@ -609,6 +614,7 @@ final class TestSecureOzoneCluster {
 
       setupOm(conf);
       om.setCertClient(new CertificateClientTestImpl(conf));
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.start();
     } catch (Exception ex) {
       // Expects timeout failure from scmClient in om but om user login via
@@ -676,6 +682,7 @@ final class TestSecureOzoneCluster {
       setupOm(conf);
       OzoneManager.setTestSecureOmFlag(true);
       om.setCertClient(new CertificateClientTestImpl(conf));
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.start();
 
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -763,6 +770,7 @@ final class TestSecureOzoneCluster {
       setupOm(conf);
       // Start OM
       om.setCertClient(new CertificateClientTestImpl(conf));
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.start();
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       String username = ugi.getUserName();
@@ -999,6 +1007,7 @@ final class TestSecureOzoneCluster {
       // create Ozone Manager instance, it will start the monitor task
       conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
       om = OzoneManager.createOm(conf);
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.setCertClient(client);
 
       // check after renew, client will have the new cert ID
@@ -1164,6 +1173,7 @@ final class TestSecureOzoneCluster {
       // create Ozone Manager instance, it will start the monitor task
       conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
       om = OzoneManager.createOm(conf);
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.setCertClient(mockClient);
 
       // check error message during renew
@@ -1202,6 +1212,7 @@ final class TestSecureOzoneCluster {
       String omCertId1 = omCert.getSerialNumber().toString();
       // Start OM
       om.setCertClient(certClient);
+      om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient));
       om.start();
       GenericTestUtils.waitFor(() -> om.isLeaderReady(), 100, 10000);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 50ff9c36a0..2ae69dc3c9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -50,6 +50,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -162,6 +165,9 @@ public class TestOmContainerLocationCache {
     mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class);
     mockScmContainerClient =
         mock(StorageContainerLocationProtocol.class);
+    InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+    when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
+        factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
 
     OmTestManagers omTestManagers = new OmTestManagers(conf,
         mockScmBlockLocationProtocol, mockScmContainerClient);
@@ -247,10 +253,13 @@ public class TestOmContainerLocationCache {
   }
 
   @BeforeEach
-  public void beforeEach() {
+  public void beforeEach() throws IOException {
     CONTAINER_ID.getAndIncrement();
     reset(mockScmBlockLocationProtocol, mockScmContainerClient,
         mockDn1Protocol, mockDn2Protocol);
+    InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+    when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
+        factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
     when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1));
     when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2));
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
index 41f1c14f37..72f1c3374b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -197,6 +198,8 @@ public class TestOzoneManagerListVolumesSecure {
     OzoneManager.setTestSecureOmFlag(true);
 
     om = OzoneManager.createOm(conf);
+    om.setScmTopologyClient(new ScmTopologyClient(
+        new ScmBlockLocationTestingClient(null, null, 0)));
     om.setCertClient(new CertificateClientTestImpl(conf));
     om.start();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ffe1908c68..d2ca26e3fc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -45,11 +45,15 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -58,6 +62,9 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -96,6 +103,7 @@ import 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -108,6 +116,7 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
@@ -190,6 +199,7 @@ public class KeyManagerImpl implements KeyManager {
   private BackgroundService openKeyCleanupService;
   private BackgroundService multipartUploadCleanupService;
   private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
+  private DNSToSwitchMapping dnsToSwitchMapping;
 
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -339,6 +349,16 @@ public class KeyManagerImpl implements KeyManager {
           ozoneManager, configuration);
       multipartUploadCleanupService.start();
     }
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        configuration.getClass(
+            DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, configuration);
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
   }
 
   KeyProviderCryptoExtension getKMSProvider() {
@@ -1844,8 +1864,7 @@ public class KeyManagerImpl implements KeyManager {
     return encInfo;
   }
 
-  @VisibleForTesting
-  void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
+  private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
     if (keyInfos != null && clientMachine != null) {
       Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new 
HashMap<>();
       for (OmKeyInfo keyInfo : keyInfos) {
@@ -1865,8 +1884,7 @@ public class KeyManagerImpl implements KeyManager {
               LOG.warn("No datanodes in pipeline {}", pipeline.getId());
               continue;
             }
-            sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo,
-                uuidList);
+            sortedNodes = sortDatanodes(nodes, clientMachine);
             if (sortedNodes != null) {
               sortedPipelines.put(uuidSet, sortedNodes);
             }
@@ -1882,24 +1900,59 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
-  private List<DatanodeDetails> sortDatanodes(String clientMachine,
-      List<DatanodeDetails> nodes, OmKeyInfo keyInfo, List<String> nodeList) {
-    List<DatanodeDetails> sortedNodes = null;
+  @VisibleForTesting
+  public List<DatanodeDetails> sortDatanodes(List<DatanodeDetails> nodes,
+                                             String clientMachine) {
+    final Node client = getClientNode(clientMachine, nodes);
+    return ozoneManager.getClusterMap()
+        .sortByDistanceCost(client, nodes, nodes.size());
+  }
+
+  private Node getClientNode(String clientMachine,
+                             List<DatanodeDetails> nodes) {
+    List<DatanodeDetails> matchingNodes = new ArrayList<>();
+    boolean useHostname = ozoneManager.getConfiguration().getBoolean(
+        DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    for (DatanodeDetails node : nodes) {
+      if ((useHostname ? node.getHostName() : node.getIpAddress()).equals(
+          clientMachine)) {
+        matchingNodes.add(node);
+      }
+    }
+    return !matchingNodes.isEmpty() ? matchingNodes.get(0) :
+        getOtherNode(clientMachine);
+  }
+
+  private Node getOtherNode(String clientMachine) {
     try {
-      sortedNodes = scmClient.getBlockClient()
-          .sortDatanodes(nodeList, clientMachine);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes,
-            clientMachine, sortedNodes);
+      String clientLocation = resolveNodeLocation(clientMachine);
+      if (clientLocation != null) {
+        Node rack = ozoneManager.getClusterMap().getNode(clientLocation);
+        if (rack instanceof InnerNode) {
+          return new NodeImpl(clientMachine, clientLocation,
+              (InnerNode) rack, rack.getLevel() + 1,
+              NODE_COST_DEFAULT);
+        }
       }
-    } catch (IOException e) {
-      LOG.warn("Unable to sort datanodes based on distance to client, "
-          + " volume={}, bucket={}, key={}, client={}, datanodes={}, "
-          + " exception={}",
-          keyInfo.getVolumeName(), keyInfo.getBucketName(),
-          keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage());
+    } catch (Exception e) {
+      LOG.info("Could not resolve client {}: {}",
+          clientMachine, e.getMessage());
+    }
+    return null;
+  }
+
+  private String resolveNodeLocation(String hostname) {
+    List<String> hosts = Collections.singletonList(hostname);
+    List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
+    if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
+      String location = resolvedHosts.get(0);
+      LOG.debug("Node {} resolved to location {}", hostname, location);
+      return location;
+    } else {
+      LOG.debug("Node resolution did not yield any result for {}", hostname);
+      return null;
     }
-    return sortedNodes;
   }
 
   private static List<String> toNodeUuid(Collection<DatanodeDetails> nodes) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b8133e5844..c27ba7836b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -82,7 +82,11 @@ import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
@@ -354,6 +358,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private OzoneBlockTokenSecretManager blockTokenMgr;
   private CertificateClient certClient;
   private SecretKeySignerClient secretKeyClient;
+  private ScmTopologyClient scmTopologyClient;
   private final Text omRpcAddressTxt;
   private OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
@@ -386,6 +391,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private OzoneManagerHttpServer httpServer;
   private final OMStorage omStorage;
   private ObjectName omInfoBeanName;
+  private NetworkTopology clusterMap;
   private Timer metricsTimer;
   private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
   private static final ObjectWriter WRITER =
@@ -603,6 +609,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     final StorageContainerLocationProtocol scmContainerClient = 
getScmContainerClient(configuration);
     // verifies that the SCM info in the OM Version file is correct.
     final ScmBlockLocationProtocol scmBlockClient = 
getScmBlockClient(configuration);
+    scmTopologyClient = new ScmTopologyClient(scmBlockClient);
     this.scmClient = new ScmClient(scmBlockClient, scmContainerClient,
         configuration);
     this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(),
@@ -1135,6 +1142,24 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     serviceInfo = new ServiceInfoProvider(secConfig, this, certClient);
   }
 
+  /**
+   * For testing purpose only. This allows setting up ScmBlockLocationClient
+   * without having to fully setup a working cluster.
+   */
+  @VisibleForTesting
+  public void setScmTopologyClient(
+      ScmTopologyClient scmTopologyClient) {
+    this.scmTopologyClient = scmTopologyClient;
+  }
+
+  public NetworkTopology getClusterMap() {
+    InnerNode currentTree = scmTopologyClient.getClusterTree();
+    return new NetworkTopologyImpl(configuration.get(
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
+        currentTree);
+  }
+
   /**
    * For testing purpose only. This allows testing token in integration test
    * without fully setting up a working secure cluster.
@@ -1673,6 +1698,18 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     keyManager.start(configuration);
 
+    try {
+      scmTopologyClient.start(configuration);
+    } catch (IOException ex) {
+      LOG.error("Unable to initialize network topology schema file. ", ex);
+      throw new UncheckedIOException(ex);
+    }
+
+    clusterMap = new NetworkTopologyImpl(configuration.get(
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT),
+        scmTopologyClient.getClusterTree());
+
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
       httpServer.start();
@@ -2232,6 +2269,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       }
       keyManager.stop();
       stopSecretManager();
+
+      if (scmTopologyClient != null) {
+        scmTopologyClient.stop();
+      }
+
       if (httpServer != null) {
         httpServer.stop();
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
index 43d29c1608..edffd5ed74 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.ScmTopologyClient;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -105,12 +106,16 @@ public final class OmTestManagers {
     keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
         .getInternalState(om, "keyManager");
     ScmClient scmClient = new ScmClient(scmBlockClient, containerClient, conf);
+    ScmTopologyClient scmTopologyClient =
+        new ScmTopologyClient(scmBlockClient);
     HddsWhiteboxTestUtils.setInternalState(om,
         "scmClient", scmClient);
     HddsWhiteboxTestUtils.setInternalState(keyManager,
         "scmClient", scmClient);
     HddsWhiteboxTestUtils.setInternalState(keyManager,
         "secretManager", mock(OzoneBlockTokenSecretManager.class));
+    HddsWhiteboxTestUtils.setInternalState(om,
+        "scmTopologyClient", scmTopologyClient);
 
     om.start();
     waitFor(() -> om.getOmRatisServer().checkLeaderStatus() == 
RaftServerStatus.LEADER_AND_READY,
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 8847a2d51e..8ba5ca779c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -200,6 +203,14 @@ public class ScmBlockLocationTestingClient implements 
ScmBlockLocationProtocol {
     return null;
   }
 
+  @Override
+  public InnerNode getNetworkTopology() {
+    InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+    InnerNode clusterTree = factory.newInnerNode("", "", null,
+        NetConstants.ROOT_LEVEL, 1);
+    return clusterTree;
+  }
+
   /**
    * Return the number of blocks puesdo deleted by this testing client.
    */
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 278d96023c..5e2e27e0c1 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
@@ -44,6 +42,9 @@ import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.InnerNodeImpl;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -79,14 +80,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static java.util.Comparator.comparing;
-import static java.util.stream.Collectors.toList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -124,6 +120,9 @@ class TestKeyManagerUnit extends OzoneTestBase {
     configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.toString());
     containerClient = mock(StorageContainerLocationProtocol.class);
     blockClient = mock(ScmBlockLocationProtocol.class);
+    InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+    when(blockClient.getNetworkTopology()).thenReturn(
+        factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
 
     OmTestManagers omTestManagers
         = new OmTestManagers(configuration, blockClient, containerClient);
@@ -644,9 +643,6 @@ class TestKeyManagerUnit extends OzoneTestBase {
     OMRequestTestUtils.addBucketToDB(volume, bucket, metadataManager);
 
     final Pipeline pipeline = MockPipeline.createPipeline(3);
-    final List<String> nodes = pipeline.getNodes().stream()
-        .map(DatanodeDetails::getUuidString)
-        .collect(toList());
 
     Set<Long> containerIDs = new HashSet<>();
     List<ContainerWithPipeline> containersWithPipeline = new ArrayList<>();
@@ -696,7 +692,6 @@ class TestKeyManagerUnit extends OzoneTestBase {
 
     assertEquals(10, fileStatusList.size());
     verify(containerClient).getContainerWithPipelineBatch(containerIDs);
-    verify(blockClient).sortDatanodes(nodes, client);
 
     // call list status the second time, and verify no more calls to
     // SCM.
@@ -704,67 +699,4 @@ class TestKeyManagerUnit extends OzoneTestBase {
         null, Long.MAX_VALUE, client);
     verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
   }
-
-  @ParameterizedTest
-  @ValueSource(strings = {"anyhost", ""})
-  public void sortDatanodes(String client) throws Exception {
-    // GIVEN
-    int pipelineCount = 3;
-    int keysPerPipeline = 5;
-    OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline];
-    List<List<String>> expectedSortDatanodesInvocations = new ArrayList<>();
-    Map<Pipeline, List<DatanodeDetails>> expectedSortedNodes = new HashMap<>();
-    int ki = 0;
-    for (int p = 0; p < pipelineCount; p++) {
-      final Pipeline pipeline = MockPipeline.createPipeline(3);
-      final List<String> nodes = pipeline.getNodes().stream()
-          .map(DatanodeDetails::getUuidString)
-          .collect(toList());
-      expectedSortDatanodesInvocations.add(nodes);
-      final List<DatanodeDetails> sortedNodes = pipeline.getNodes().stream()
-          .sorted(comparing(DatanodeDetails::getUuidString))
-          .collect(toList());
-      expectedSortedNodes.put(pipeline, sortedNodes);
-
-      when(blockClient.sortDatanodes(nodes, client))
-          .thenReturn(sortedNodes);
-
-      for (int i = 1; i <= keysPerPipeline; i++) {
-        OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
-            .setBlockID(new BlockID(i, 1L))
-            .setPipeline(pipeline)
-            .setOffset(0)
-            .setLength(256000)
-            .build();
-
-        OmKeyInfo keyInfo = new OmKeyInfo.Builder()
-            .setOmKeyLocationInfos(Arrays.asList(
-                new OmKeyLocationInfoGroup(0, emptyList()),
-                new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo))))
-            .build();
-        keyInfos[ki++] = keyInfo;
-      }
-    }
-
-    // WHEN
-    keyManager.sortDatanodes(client, keyInfos);
-
-    // THEN
-    // verify all key info locations got updated
-    for (OmKeyInfo keyInfo : keyInfos) {
-      OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
-      assertNotNull(locations);
-      for (OmKeyLocationInfo locationInfo : locations.getLocationList()) {
-        Pipeline pipeline = locationInfo.getPipeline();
-        List<DatanodeDetails> expectedOrder = 
expectedSortedNodes.get(pipeline);
-        assertEquals(expectedOrder, pipeline.getNodesInOrder());
-      }
-    }
-
-    // expect one invocation per pipeline
-    for (List<String> nodes : expectedSortDatanodesInvocations) {
-      verify(blockClient).sortDatanodes(nodes, client);
-    }
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to