This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 25141bd4d98 HDDS-14336. Remove OM Proxy duplication in 
HadoopRpcOMFailoverProxyProvider (#9579)
25141bd4d98 is described below

commit 25141bd4d98d97c2a746a09003718011a8ca4b4b
Author: Ivan Andika <[email protected]>
AuthorDate: Tue Jan 13 04:31:12 2026 +0800

    HDDS-14336. Remove OM Proxy duplication in HadoopRpcOMFailoverProxyProvider 
(#9579)
---
 .../ozone/om/ha/GrpcOMFailoverProxyProvider.java   |  34 +++---
 .../om/ha/HadoopRpcOMFailoverProxyProvider.java    |  91 ++++++----------
 .../ozone/om/ha/OMFailoverProxyProviderBase.java   | 115 ++++++++++++---------
 .../org/apache/hadoop/ozone/om/ha/OMProxyInfo.java |  38 ++++---
 .../ozone/om/protocolPB/GrpcOmTransport.java       |  17 ++-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   6 +-
 .../ozone/client/rpc/OzoneRpcClientTests.java      |   5 +-
 .../ozone/om/TestOzoneManagerHAWithAllRunning.java |  10 +-
 .../hadoop/ozone/om/failover/TestOMFailovers.java  |  33 +++---
 9 files changed, 167 insertions(+), 182 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index 0688b66911a..3592367720e 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -26,6 +26,7 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +35,6 @@
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ha.ConfUtils;
@@ -64,18 +64,17 @@ public GrpcOMFailoverProxyProvider(ConfigurationSource 
configuration,
   }
 
   @Override
-  protected void loadOMClientConfigs(ConfigurationSource config, String 
omSvcId)
+  protected void initOmProxiesFromConfigs(ConfigurationSource config, String 
omSvcId)
       throws IOException {
 
     Collection<String> omNodeIds = 
OmUtils.getActiveNonListenerOMNodeIds(config, omSvcId);
-    Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+    Map<String, OMProxyInfo<T>> omProxies = new HashMap<>();
     List<String> omNodeIDList = new ArrayList<>();
-    Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
 
     for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
       String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
           omSvcId, nodeId);
-      Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+      Optional<String> hostAddr = getHostNameFromConfigKeys(config,
           rpcAddrKey);
       OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
           ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
@@ -84,17 +83,13 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
       if (nodeId == null) {
         nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
       }
-      if (hostaddr.isPresent()) {
-        int port = hostport.orElse(config
-            .getObject(GrpcOmTransport
-                .GrpcOmTransportConfig.class)
-            .getPort());
-        ProxyInfo<T> proxyInfo =
-            new ProxyInfo<>(createOMProxy(),
-                hostaddr.get() + ":" + port);
+      if (hostAddr.isPresent()) {
+        int port = hostport
+            
.orElse(config.getObject(GrpcOmTransport.GrpcOmTransportConfig.class).getPort());
+        String rpcAddrStr = hostAddr.get() + ":" + port;
+        OMProxyInfo<T> proxyInfo =
+            new OMProxyInfo<>(createOMProxy(), omSvcId, nodeId, rpcAddrStr, 
rpcAddrStr);
         omProxies.put(nodeId, proxyInfo);
-        omNodeAddressMap.put(nodeId,
-            NetUtils.createSocketAddr(proxyInfo.proxyInfo));
       } else {
         LOG.error("expected host address not defined for: {}", rpcAddrKey);
         throw new ConfigurationException(rpcAddrKey + "is not defined");
@@ -107,9 +102,8 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
           "addresses for OM. Please configure the system with "
           + OZONE_OM_ADDRESS_KEY);
     }
-    setOmProxies(omProxies);
-    setOmNodeIDList(omNodeIDList);
-    setOmNodeAddressMap(omNodeAddressMap);
+    Collections.shuffle(omNodeIDList);
+    initOmProxies(omProxies, omNodeIDList);
   }
 
   private T createOMProxy() throws IOException {
@@ -149,7 +143,7 @@ public synchronized void close() throws IOException { }
 
   // need to throw if nodeID not in omAddresses
   public String getGrpcProxyAddress(String nodeId) throws IOException {
-    Map<String, ProxyInfo<T>> omProxies = getOMProxyMap();
+    Map<String, OMProxyInfo<T>> omProxies = getOMProxyMap();
     if (omProxies.containsKey(nodeId)) {
       return omProxies.get(nodeId).proxyInfo;
     } else {
@@ -161,6 +155,6 @@ public String getGrpcProxyAddress(String nodeId) throws 
IOException {
   }
 
   public List<String> getGrpcOmNodeIDList() {
-    return getOmNodeIDList();
+    return getOmNodesInOrder();
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index 78a07e23b3a..7d5e40e9f8d 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -19,9 +19,8 @@
 
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -50,7 +49,6 @@ public class HadoopRpcOMFailoverProxyProvider<T> extends
       LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
 
   private final Text delegationTokenService;
-  private Map<String, OMProxyInfo> omProxyInfos;
 
   // HadoopRpcOMFailoverProxyProvider, on encountering certain exception,
   // tries each OM once in a round robin fashion. After that it waits
@@ -67,12 +65,11 @@ public HadoopRpcOMFailoverProxyProvider(ConfigurationSource 
configuration,
   }
 
   @Override
-  protected void loadOMClientConfigs(ConfigurationSource config, String 
omSvcId)
+  protected void initOmProxiesFromConfigs(ConfigurationSource config, String 
omSvcId)
       throws IOException {
-    Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
-    this.omProxyInfos = new HashMap<>();
+    Map<String, OMProxyInfo<T>> omProxies = new HashMap<>();
+
     List<String> omNodeIDList = new ArrayList<>();
-    Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
 
     Collection<String> omNodeIds = 
OmUtils.getActiveNonListenerOMNodeIds(config,
         omSvcId);
@@ -86,8 +83,8 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
         continue;
       }
 
-      OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
-          rpcAddrStr);
+      // ProxyInfo.proxy will be set during first time call to server.
+      OMProxyInfo<T> omProxyInfo = new OMProxyInfo<>(omSvcId, nodeId, 
rpcAddrStr);
 
       if (omProxyInfo.getAddress() != null) {
         // For a non-HA OM setup, nodeId might be null. If so, we assign it
@@ -95,11 +92,8 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
         if (nodeId == null) {
           nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
         }
-        // ProxyInfo will be set during first time call to server.
-        omProxies.put(nodeId, null);
-        omProxyInfos.put(nodeId, omProxyInfo);
+        omProxies.put(nodeId, omProxyInfo);
         omNodeIDList.add(nodeId);
-        omNodeAddressMap.put(nodeId, omProxyInfo.getAddress());
       } else {
         LOG.error("Failed to create OM proxy for {} at address {}",
             nodeId, rpcAddrStr);
@@ -111,9 +105,8 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
           "addresses for OM. Please configure the system with "
           + OZONE_OM_ADDRESS_KEY);
     }
-    setOmProxies(omProxies);
-    setOmNodeIDList(omNodeIDList);
-    setOmNodeAddressMap(omNodeAddressMap);
+    Collections.shuffle(omNodeIDList);
+    initOmProxies(omProxies, omNodeIDList);
   }
 
   /**
@@ -123,31 +116,24 @@ protected void loadOMClientConfigs(ConfigurationSource 
config, String omSvcId)
    */
   @Override
   public synchronized ProxyInfo<T> getProxy() {
-    ProxyInfo currentProxyInfo = 
getOMProxyMap().get(getCurrentProxyOMNodeId());
-    if (currentProxyInfo == null) {
-      currentProxyInfo = createOMProxy(getCurrentProxyOMNodeId());
-    }
-    return currentProxyInfo;
+    OMProxyInfo<T> current = getOMProxyMap().get(getCurrentProxyOMNodeId());
+    return createOMProxyIfNeeded(current);
   }
 
   /**
    * Creates proxy object.
    */
-  protected ProxyInfo createOMProxy(String nodeId) {
-    OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
-    InetSocketAddress address = omProxyInfo.getAddress();
-    ProxyInfo proxyInfo;
-    try {
-      T proxy = createOMProxy(address);
-      // Create proxyInfo here, to make it work with all Hadoop versions.
-      proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
-      getOMProxyMap().put(nodeId, proxyInfo);
-    } catch (IOException ioe) {
-      LOG.error("{} Failed to create RPC proxy to OM at {}",
-          this.getClass().getSimpleName(), address, ioe);
-      throw new RuntimeException(ioe);
+  protected ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
+    if (omProxyInfo.proxy == null) {
+      try {
+        omProxyInfo.proxy = createOMProxy(omProxyInfo.getAddress());
+      } catch (IOException ioe) {
+        LOG.error("{} Failed to create RPC proxy to OM at {}",
+            this.getClass().getSimpleName(), omProxyInfo.getAddress(), ioe);
+        throw new RuntimeException(ioe);
+      }
     }
-    return proxyInfo;
+    return omProxyInfo;
   }
 
   public Text getCurrentProxyDelegationToken() {
@@ -158,8 +144,8 @@ protected Text computeDelegationTokenService() {
     // For HA, this will return "," separated address of all OM's.
     List<String> addresses = new ArrayList<>();
 
-    for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
-        omProxyInfos.entrySet()) {
+    for (Map.Entry<String, OMProxyInfo<T>> omProxyInfoSet :
+        getOMProxyMap().entrySet()) {
       Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
 
       // During client object creation when one of the OM configured address
@@ -186,33 +172,14 @@ protected Text computeDelegationTokenService() {
   @Override
   public synchronized void close() throws IOException {
     for (ProxyInfo<T> proxyInfo : getOMProxies()) {
-      if (proxyInfo != null) {
-        RPC.stopProxy(proxyInfo.proxy);
+      if (proxyInfo.proxy != null) {
+        if (proxyInfo.proxy instanceof Closeable) {
+          ((Closeable)proxyInfo.proxy).close();
+        } else {
+          RPC.stopProxy(proxyInfo.proxy);
+        }
       }
     }
   }
-
-  @VisibleForTesting
-  public List<OMProxyInfo> getOMProxyInfos() {
-    return new ArrayList<OMProxyInfo>(omProxyInfos.values());
-  }
-
-  @VisibleForTesting
-  public Map<String, OMProxyInfo> getOMProxyInfoMap() {
-    return omProxyInfos;
-  }
-
-  @VisibleForTesting
-  protected void setProxiesForTesting(
-      Map<String, ProxyInfo<T>> setOMProxies,
-      Map<String, OMProxyInfo> setOMProxyInfos,
-      List<String> setOMNodeIDList,
-      Map<String, InetSocketAddress> setOMNodeAddress) {
-    setOmProxies(setOMProxies);
-    this.omProxyInfos = setOMProxyInfos;
-    setOmNodeIDList(setOMNodeIDList);
-    setOmNodeAddressMap(setOMNodeAddress);
-  }
-
 }
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index d58bbaf02e4..04cc40a6b0f 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -50,6 +50,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,11 +70,25 @@ public abstract class OMFailoverProxyProviderBase<T> 
implements
   private final ConfigurationSource conf;
   private final Class<T> protocolClass;
 
-  // Map of OMNodeID to its proxy
-  private Map<String, ProxyInfo<T>> omProxies;
-  private List<String> omNodeIDList;
-  private Map<String, InetSocketAddress> omNodeAddressMap;
-
+  // omProxies: Map of OMNodeID to its proxy
+  // omNodesInOrder: List that specifies the ordering of OM nodes that
+  // is used to determine the next OM node proxy to retrieve
+  // from omProxies.
+  // Invariants:
+  // 1. size(omProxies) == size(omNodesInOrder)
+  // 2. set(omProxies.keySet) == set(omNodesInOrder)
+  private Map<String, OMProxyInfo<T>> omProxies;
+  private List<String> omNodesInOrder;
+
+  // These are used to identify the current and next OM node
+  // OMNodeId is used to retrieve proxy from omProxies
+  // ProxyIndex is used to find the node ID from omNodesInOrder
+  // Invariants:
+  // 1. omProxies[currentProxyOMNodeId] = omNodesInOrder[currentProxyIndex]
+  // 2. omProxies[nextProxyOMNodeId] = omNodesInOrder[nextProxyIndex]
+  // Note that these fields need to be modified atomically (e.g. using 
synchronized)
+  // Specifically (currentProxyOMNodeId, currentProxyNodeIndex) and 
(nextProxyOMNodeId, nextProxyIndex)
+  // should be atomically updated
   private String currentProxyOMNodeId;
   private int currentProxyIndex;
   private String nextProxyOMNodeId;
@@ -84,11 +99,11 @@ public abstract class OMFailoverProxyProviderBase<T> 
implements
   // before attempting to contact all the OMs again. For other exceptions
   // such as LeaderNotReadyException, the same OM is contacted again with a
   // linearly increasing wait time.
-  private Set<String> attemptedOMs = new HashSet<>();
+  private final Set<String> attemptedOMs = new HashSet<>();
   private String lastAttemptedOM;
   private int numAttemptsOnSameOM = 0;
   private final long waitBetweenRetries;
-  private Set<String> accessControlExceptionOMs = new HashSet<>();
+  private final Set<String> accessControlExceptionOMs = new HashSet<>();
   private boolean performFailoverDone;
 
   private final UserGroupInformation ugi;
@@ -106,19 +121,23 @@ public OMFailoverProxyProviderBase(ConfigurationSource 
configuration,
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
 
-    loadOMClientConfigs(conf, omServiceId);
-    Objects.requireNonNull(omProxies, "omProxies == null");
-    Objects.requireNonNull(omNodeIDList, "omNodeIDList == null");
-    Objects.requireNonNull(omNodeAddressMap, "omNodeAddressMap == null");
+    initOmProxiesFromConfigs(conf, omServiceId);
 
     nextProxyIndex = 0;
-    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+    nextProxyOMNodeId = omNodesInOrder.get(nextProxyIndex);
     currentProxyIndex = 0;
     currentProxyOMNodeId = nextProxyOMNodeId;
   }
 
-  protected abstract void loadOMClientConfigs(ConfigurationSource config,
-                                              String omSvcId)
+  /**
+   * Initialize the OM proxies from the configuration and the OM service ID.
+   * The implementation initialize the OM proxies and ordered OM node ID list
+   * through {@link #initOmProxies(Map, List)}
+   * @param config configuration containing OM node information
+   * @param omSvcId OM service ID
+   * @throws IOException if any exception occurs while trying to initialize 
the proxy.
+   */
+  protected abstract void initOmProxiesFromConfigs(ConfigurationSource config, 
String omSvcId)
       throws IOException;
 
   /**
@@ -137,7 +156,7 @@ protected T createOMProxy(InetSocketAddress omAddress) 
throws IOException {
     // Ensure we do not attempt retry on the same OM in case of exceptions
     RetryPolicy connectionRetryPolicy = 
RetryPolicies.failoverOnNetworkException(0);
 
-    return (T) RPC.getProtocolProxy(
+    return RPC.getProtocolProxy(
         getInterface(),
         RPC.getProtocolVersion(protocolClass),
         omAddress,
@@ -160,9 +179,7 @@ protected synchronized boolean shouldFailover(Exception ex) 
{
         return false;
       } else {
         accessControlExceptionOMs.add(nextProxyOMNodeId);
-        if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
-          return false;
-        }
+        return !accessControlExceptionOMs.containsAll(omNodesInOrder);
       }
     } else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
       return false;
@@ -220,8 +237,8 @@ public RetryAction shouldRetry(Exception exception, int 
retries,
                 notLeaderException.getSuggestedLeaderNodeId();
             if (suggestedLeaderAddress != null &&
                 suggestedNodeId != null &&
-                omNodeAddressMap.containsKey(suggestedNodeId) &&
-                omNodeAddressMap.get(suggestedNodeId).toString()
+                omProxies.containsKey(suggestedNodeId) &&
+                omProxies.get(suggestedNodeId).getRpcAddr().toString()
                     .equals(suggestedLeaderAddress)) {
               setNextOmProxy(suggestedNodeId);
               return getRetryAction(RetryDecision.FAILOVER_AND_RETRY,
@@ -259,7 +276,7 @@ private RetryAction getRetryAction(RetryDecision 
fallbackAction,
           return new RetryAction(fallbackAction, getWaitTime());
         } else {
           LOG.error("Failed to connect to OMs: {}. Attempted {} failovers.",
-              omNodeIDList, maxFailovers);
+              omNodesInOrder, maxFailovers);
           return RetryAction.FAIL;
         }
       }
@@ -318,7 +335,7 @@ public synchronized void selectNextOmProxy() {
       int newProxyIndex = incrementNextProxyIndex();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
-            newProxyIndex, omNodeIDList.get(newProxyIndex));
+            newProxyIndex, omNodesInOrder.get(newProxyIndex));
       }
     }
   }
@@ -334,7 +351,7 @@ private synchronized int incrementNextProxyIndex() {
     attemptedOMs.add(nextProxyOMNodeId);
 
     nextProxyIndex = (nextProxyIndex + 1) % omProxies.size();
-    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+    nextProxyOMNodeId = omNodesInOrder.get(nextProxyIndex);
     return nextProxyIndex;
   }
 
@@ -348,7 +365,7 @@ private synchronized boolean updateLeaderOMNodeId(String 
newLeaderOMNodeId) {
       if (omProxies.containsKey(newLeaderOMNodeId)) {
         lastAttemptedOM = nextProxyOMNodeId;
         nextProxyOMNodeId = newLeaderOMNodeId;
-        nextProxyIndex = omNodeIDList.indexOf(nextProxyOMNodeId);
+        nextProxyIndex = omNodesInOrder.indexOf(nextProxyOMNodeId);
         return true;
       }
     } else {
@@ -391,19 +408,20 @@ public synchronized long getWaitTime() {
     return waitBetweenRetries;
   }
 
-  public List<ProxyInfo> getOMProxies() {
-    return new ArrayList<ProxyInfo>(omProxies.values());
+  public List<OMProxyInfo<T>> getOMProxies() {
+    return new ArrayList<>(omProxies.values());
   }
 
-  public Map<String, ProxyInfo<T>> getOMProxyMap() {
+  public Map<String, OMProxyInfo<T>> getOMProxyMap() {
     return omProxies;
   }
 
   /**
-   * Check if exception is OMLeaderNotReadyException.
+   * Unwrap the exception and return the wrapped OMLeaderNotReadyException if 
any.
    *
-   * @param exception
-   * @return OMLeaderNotReadyException
+   * @param exception exception to unwrap.
+   * @return the unwrapped OMLeaderNotReadyException or null if the wrapped
+   *         exception is not OMLeaderNotReadyException.
    */
   public static OMLeaderNotReadyException getLeaderNotReadyException(
       Exception exception) {
@@ -419,9 +437,11 @@ public static OMLeaderNotReadyException 
getLeaderNotReadyException(
   }
 
   /**
-   * Check if exception is a OMNotLeaderException.
+   * Unwrap the exception and return the wrapped OMNotLeaderException if any.
    *
-   * @return OMNotLeaderException.
+   * @param exception exception to unwrap.
+   * @return the unwrapped OMNotLeaderException or null if the wrapped
+   *         exception is not OMNotLeaderException.
    */
   public static OMNotLeaderException getNotLeaderException(
       Exception exception) {
@@ -440,23 +460,24 @@ protected ConfigurationSource getConf() {
     return conf;
   }
 
-  protected synchronized void setOmProxies(Map<String,
-      ProxyInfo<T>> omProxies) {
-    this.omProxies = omProxies;
-  }
-
-  protected synchronized void setOmNodeIDList(List<String> omNodeIDList) {
-    Collections.shuffle(omNodeIDList);
-    this.omNodeIDList = Collections.unmodifiableList(omNodeIDList);
-  }
-
-  protected synchronized List<String> getOmNodeIDList() {
-    return omNodeIDList;
+  /**
+   * Initialize the OM proxy map and the OM nodes ordering.
+   * @param omProxyMap OM Node ID to OMProxyInfo map
+   * @param omNodesInOrderList Ordered list of OM Node ID to define failover 
ordering.
+   */
+  protected synchronized void initOmProxies(Map<String, OMProxyInfo<T>> 
omProxyMap, List<String> omNodesInOrderList) {
+    Objects.requireNonNull(omProxyMap, "omProxyMap == null");
+    Objects.requireNonNull(omProxyMap, "omNodesInOrderList == null");
+    Preconditions.assertSame(omProxyMap.size(), omNodesInOrderList.size(),
+        "omProxyMap and omNodesInOrderList should have the same size");
+    Preconditions.assertEquals(omProxyMap.keySet(), new 
HashSet<>(omNodesInOrderList),
+        "the OM node IDs of omProxies keys should be the same as 
omNodesInOrder");
+    this.omProxies = omProxyMap;
+    this.omNodesInOrder = Collections.unmodifiableList(omNodesInOrderList);
   }
 
-  protected synchronized void setOmNodeAddressMap(
-      Map<String, InetSocketAddress> map) {
-    this.omNodeAddressMap = map;
+  protected synchronized List<String> getOmNodesInOrder() {
+    return omNodesInOrder;
   }
 
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
index 8ea1749db9a..e9c0883b4b9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java
@@ -19,6 +19,7 @@
 
 import java.net.InetSocketAddress;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.slf4j.Logger;
@@ -27,16 +28,17 @@
 /**
  * Class to store OM proxy information.
  */
-public class OMProxyInfo {
-  private String nodeId;
-  private String rpcAddrStr;
-  private InetSocketAddress rpcAddr;
-  private Text dtService;
+public class OMProxyInfo<T> extends ProxyInfo<T> {
+  private final String nodeId;
+  private final String rpcAddrStr;
+  private final InetSocketAddress rpcAddr;
+  private final Text dtService;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OMProxyInfo.class);
 
-  OMProxyInfo(String serviceID, String nodeID, String rpcAddress) {
+  public OMProxyInfo(T proxy, String serviceID, String nodeID, String 
rpcAddress, String proxyInfo) {
+    super(proxy, proxyInfo);
     this.nodeId = nodeID;
     this.rpcAddrStr = rpcAddress;
     this.rpcAddr = NetUtils.createSocketAddr(rpcAddrStr);
@@ -47,24 +49,28 @@ public class OMProxyInfo {
           rpcAddress, serviceID, nodeId);
       this.dtService = null;
     } else {
-
       // This issue will be a problem with docker/kubernetes world where one of
       // the container is killed, and that OM address will be unresolved.
       // For now skip the unresolved OM address setting it to the token
       // service field.
-
       this.dtService = SecurityUtil.buildTokenService(rpcAddr);
     }
   }
 
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder()
-        .append("nodeId=")
-        .append(nodeId)
-        .append(",nodeAddress=")
-        .append(rpcAddrStr);
-    return sb.toString();
+  public OMProxyInfo(String serviceID, String nodeID, String rpcAddress) {
+    this(null, serviceID, nodeID, rpcAddress, "nodeId=" + nodeID + 
",nodeAddress=" + rpcAddress);
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+
+  public String getRpcAddrStr() {
+    return rpcAddrStr;
+  }
+
+  public InetSocketAddress getRpcAddr() {
+    return rpcAddr;
   }
 
   public InetSocketAddress getAddress() {
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index ef4ec32ddc2..a7f6953f01a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -75,18 +75,18 @@ public class GrpcOmTransport implements OmTransport {
   // gRPC specific
   private static List<X509Certificate> caCerts = null;
 
-  private Map<String,
+  private final Map<String,
       OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
-  private Map<String, ManagedChannel> channels;
-  private ConfigurationSource conf;
+  private final Map<String, ManagedChannel> channels;
+  private final ConfigurationSource conf;
 
-  private AtomicReference<String> host;
+  private final AtomicReference<String> host;
   private AtomicInteger globalFailoverCount;
   private final int maxSize;
-  private SecurityConfig secConfig;
+  private final SecurityConfig secConfig;
 
   private RetryPolicy retryPolicy;
-  private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+  private final GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
       omFailoverProxyProvider;
 
   public static void setCaCerts(List<X509Certificate> x509Certificates) {
@@ -100,14 +100,14 @@ public GrpcOmTransport(ConfigurationSource conf,
     this.channels = new HashMap<>();
     this.clients = new HashMap<>();
     this.conf = conf;
-    this.host = new AtomicReference();
+    this.host = new AtomicReference<>();
     this.globalFailoverCount = new AtomicInteger();
 
     secConfig =  new SecurityConfig(conf);
     maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
         OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
 
-    omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+    omFailoverProxyProvider = new GrpcOMFailoverProxyProvider<>(
         conf,
         ugi,
         omServiceId,
@@ -256,7 +256,6 @@ private boolean shouldRetry(Exception ex, int 
expectedFailoverCount, int request
       action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true);
       LOG.debug("grpc failover retry action {}", action.action);
       if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-        retry = false;
         LOG.error("Retry request failed. Action : {}, {}",
             action.action, ex.toString());
       } else {
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index aa1c0cb7e2d..dc6b0a331ea 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -167,9 +167,9 @@ public void testExcludesListenerNodes() throws Exception {
                  UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
                  OzoneManagerProtocolPB.class)) {
       // Verify listener node is not included in proxy map
-      
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
 + 1));
-      
assertTrue(providerWithListeners.getOMProxyInfoMap().containsKey(NODE_ID_BASE_STR
 + 3));
-      
assertFalse(providerWithListeners.getOMProxyInfoMap().containsKey(listenerNode));
+      
assertTrue(providerWithListeners.getOMProxyMap().containsKey(NODE_ID_BASE_STR + 
1));
+      
assertTrue(providerWithListeners.getOMProxyMap().containsKey(NODE_ID_BASE_STR + 
3));
+      
assertFalse(providerWithListeners.getOMProxyMap().containsKey(listenerNode));
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
index 2832a281ebb..3bc30fa0c42 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java
@@ -131,6 +131,7 @@
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.ozone.ClientConfigForTesting;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -327,13 +328,13 @@ public void testOMClientProxyProvider() {
     HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
 
-    List<OMProxyInfo> omProxies = omFailoverProxyProvider.getOMProxyInfos();
+    List<ProxyInfo> omProxies = omFailoverProxyProvider.getOMProxies();
 
     // For a non-HA OM service, there should be only one OM proxy.
     assertEquals(1, omProxies.size());
     // The address in OMProxyInfo object, which client will connect to,
     // should match the OM's RPC address.
-    assertEquals(omProxies.get(0).getAddress(),
+    assertEquals(((OMProxyInfo) omProxies.get(0)).getAddress(),
         ozoneManager.getOmRpcServerAddr());
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 5ba971d1967..8c55eb49caa 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -52,6 +52,7 @@
 import javax.management.ObjectName;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneTestUtils;
@@ -292,8 +293,8 @@ void testOMProxyProviderInitialization() {
         OmFailoverProxyUtil.getFailoverProxyProvider(
             rpcClient.getObjectStore().getClientProxy());
 
-    List<OMProxyInfo> omProxies =
-        omFailoverProxyProvider.getOMProxyInfos();
+    List<ProxyInfo> omProxies =
+        omFailoverProxyProvider.getOMProxies();
 
     assertEquals(getNumOfOMs(), omProxies.size());
 
@@ -301,7 +302,8 @@ void testOMProxyProviderInitialization() {
       OzoneManager om = getCluster().getOzoneManager(i);
       InetSocketAddress omRpcServerAddr = om.getOmRpcServerAddr();
       boolean omClientProxyExists = false;
-      for (OMProxyInfo omProxyInfo : omProxies) {
+      for (ProxyInfo proxyInfo : omProxies) {
+        OMProxyInfo omProxyInfo = (OMProxyInfo) proxyInfo;
         if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
           omClientProxyExists = true;
           break;
@@ -368,7 +370,7 @@ public void testFailoverWithSuggestedLeader() throws 
Exception {
     // The OMFailoverProxyProvider will point to the current leader OM node.
     String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
     String leaderOMAddress = ((OMProxyInfo)
-        omFailoverProxyProvider.getOMProxyInfoMap().get(leaderOMNodeId))
+        omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId))
         .getAddress().getAddress().toString();
     OzoneManager followerOM = null;
     for (OzoneManager om: getCluster().getOzoneManagersList()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 0de7052c4ed..c7463a96890 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -23,7 +23,6 @@
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -53,7 +52,7 @@ public class TestOMFailovers {
   private Exception testException;
 
   @Test
-  public void testAccessContorlExceptionFailovers() throws Exception {
+  public void testAccessControlExceptionFailovers() throws Exception {
 
     testException = new AccessControlException();
 
@@ -105,8 +104,7 @@ public OMResponse submitRequest(RpcController controller,
     }
   }
 
-  private final class MockFailoverProxyProvider
-      extends HadoopRpcOMFailoverProxyProvider {
+  private final class MockFailoverProxyProvider extends 
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> {
 
     private MockFailoverProxyProvider(ConfigurationSource configuration)
         throws IOException {
@@ -114,31 +112,28 @@ private MockFailoverProxyProvider(ConfigurationSource 
configuration)
     }
 
     @Override
-    protected ProxyInfo createOMProxy(String nodeId) {
-      ProxyInfo proxyInfo = new ProxyInfo<>(new 
MockOzoneManagerProtocol(nodeId,
-          testException), nodeId);
-      getOMProxyMap().put(nodeId, proxyInfo);
-      return proxyInfo;
+    protected ProxyInfo<OzoneManagerProtocolPB> createOMProxyIfNeeded(
+        OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo) {
+      if (omProxyInfo.proxy == null) {
+        omProxyInfo.proxy = new 
MockOzoneManagerProtocol(omProxyInfo.getNodeId(), testException);
+      }
+      return omProxyInfo;
     }
 
     @Override
-    protected void loadOMClientConfigs(ConfigurationSource config,
+    protected void initOmProxiesFromConfigs(ConfigurationSource config,
         String omSvcId) {
-      HashMap<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies =
-          new HashMap<>();
-      HashMap<String, OMProxyInfo> omProxyInfos = new HashMap<>();
-      HashMap<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();
+      HashMap<String, OMProxyInfo<OzoneManagerProtocolPB>> omProxyInfos = new 
HashMap<>();
       ArrayList<String> omNodeIDList = new ArrayList<>();
 
       for (int i = 1; i <= 3; i++) {
         String nodeId = "om" + i;
-        omProxies.put(nodeId, null);
-        omProxyInfos.put(nodeId, null);
+        OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo = new 
OMProxyInfo<>(omSvcId, nodeId,
+            "127.0.0.1:9862");
+        omProxyInfos.put(nodeId, omProxyInfo);
         omNodeIDList.add(nodeId);
-        omNodeAddressMap.put(nodeId, null);
       }
-      setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList,
-          omNodeAddressMap);
+      initOmProxies(omProxyInfos, omNodeIDList);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to