This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ca24bdb  HDDS-5228 Make OM FailOverProxyProvider work across threads 
(#3160)
ca24bdb is described below

commit ca24bdb911cb5a20ce5a074c5cf6a07bf4f56ad0
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Mon Mar 21 16:01:25 2022 -0700

    HDDS-5228 Make OM FailOverProxyProvider work across threads (#3160)
---
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 130 +++++++++++----------
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |   3 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   8 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |   5 +-
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   2 +-
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |   3 +-
 6 files changed, 80 insertions(+), 71 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5432468..d9bb975 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
@@ -54,7 +53,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -88,7 +86,9 @@ public class OMFailoverProxyProvider<T> implements
   private Map<String, OMProxyInfo> omProxyInfos;
   private List<String> omNodeIDList;
 
+  private String nextProxyOMNodeId;
   private String currentProxyOMNodeId;
+  private int nextProxyIndex;
   private int currentProxyIndex;
 
   private List<String> retryExceptions = new ArrayList<>();
@@ -103,6 +103,7 @@ public class OMFailoverProxyProvider<T> implements
   private int numAttemptsOnSameOM = 0;
   private final long waitBetweenRetries;
   private Set<String> accessControlExceptionOMs = new HashSet<>();
+  private boolean performFailoverDone;
 
   public OMFailoverProxyProvider(ConfigurationSource configuration,
       UserGroupInformation ugi, String omServiceId, Class<T> protocol)
@@ -112,11 +113,14 @@ public class OMFailoverProxyProvider<T> implements
     this.ugi = ugi;
     this.omServiceId = omServiceId;
     this.protocolClass = protocol;
+    this.performFailoverDone = true;
     loadOMClientConfigs(conf, this.omServiceId);
     this.delegationTokenService = computeDelegationTokenService();
 
+    nextProxyIndex = 0;
+    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
     currentProxyIndex = 0;
-    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+    currentProxyOMNodeId = nextProxyOMNodeId;
 
     waitBetweenRetries = conf.getLong(
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
@@ -261,9 +265,9 @@ public class OMFailoverProxyProvider<T> implements
             //  address of the suggested leader along with the nodeID.
             //  Failing over just based on nodeID is not very robust.
 
-            // OMFailoverProxyProvider#performFailover() is a dummy call and
-            // does not perform any failover. Failover manually to the next OM.
-            performFailoverToNextProxy();
+            // Prepare the next OM to be tried. This will help with calculation
+            // of the wait times needed get creating the retryAction.
+            selectNextOmProxy();
             return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
           }
 
@@ -273,7 +277,7 @@ public class OMFailoverProxyProvider<T> implements
             // Retry on same OM again as leader OM is not ready.
             // Failing over to same OM so that wait time between retries is
             // incremented
-            performFailoverIfRequired(omNodeId);
+            setNextOmProxy(omNodeId);
             return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
           }
         }
@@ -282,9 +286,9 @@ public class OMFailoverProxyProvider<T> implements
           return RetryAction.FAIL; // do not retry
         }
 
-        // For all other exceptions, fail over manually to the next OM Node
-        // proxy.
-        performFailoverToNextProxy();
+        // Prepare the next OM to be tried. This will help with calculation
+        // of the wait times needed get creating the retryAction.
+        selectNextOmProxy();
         return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
       }
 
@@ -339,39 +343,33 @@ public class OMFailoverProxyProvider<T> implements
 
   /**
    * Called whenever an error warrants failing over. It is determined by the
-   * retry policy.
-   *
-   * This is a dummy call from {@link RetryInvocationHandler}. The actual
-   * failover should be performed using either
-   * {@link OMFailoverProxyProvider#performFailoverIfRequired(String)} or
-   * {@link OMFailoverProxyProvider#performFailoverToNextProxy()}.
-   *
-   * In {@link OzoneManagerProtocolClientSideTranslatorPB}, we first
-   * manually failover and then call the RetryAction FAILOVER_AND_RETRY. This
-   * is done because we do not want to always failover to the next proxy. If we
-   * get a OMNotLeaderException with a suggested leader, then we want to
-   * failover to that OM proxy instead. Hence, we failover manually and the
-   * {@link FailoverProxyProvider#performFailover(Object)} call should not do
-   * failover again.
+   * retry policy. This method is supposed to called only once in a
+   * multithreaded environment. This where the failover occurs.
+   * performFailOver updates the currentProxyOmNodeId
+   * When 2 or more threads run in parallel, the
+   * RetryInvocationHandler will check the expectedFailOverCount
+   * and not execute performFailOver() for one of them. So the other thread(s)
+   * shall not call performFailOver(), instead it will call getProxy().
    */
   @Override
-  public void performFailover(T currentProxy) {
-    if (LOG.isDebugEnabled()) {
-      int currentIndex = getCurrentProxyIndex();
-      LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
-          currentIndex, omNodeIDList.get(currentIndex));
-    }
+  public synchronized void performFailover(T currentProxy) {
+    LOG.debug("Failing over OM from {}:{} to {}:{}",
+        currentProxyOMNodeId, currentProxyIndex,
+        nextProxyOMNodeId, nextProxyIndex);
+    currentProxyOMNodeId = nextProxyOMNodeId;
+    currentProxyIndex = nextProxyIndex;
+    performFailoverDone = true;
   }
 
   /**
-   * Performs failover if the leaderOMNodeId returned through OMReponse does
+   * Set the next leaderOMNodeId returned through OMResponse if it does
    * not match the current leaderOMNodeId cached by the proxy provider.
    */
-  public void performFailoverIfRequired(String newLeaderOMNodeId) {
+  public void setNextOmProxy(String newLeaderOMNodeId) {
     if (newLeaderOMNodeId == null) {
       LOG.debug("No suggested leader nodeId. Performing failover to next peer" 
+
           " node");
-      performFailoverToNextProxy();
+      selectNextOmProxy();
     } else {
       if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
         LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
@@ -380,14 +378,16 @@ public class OMFailoverProxyProvider<T> implements
   }
 
   /**
-   * Performs failover if the leaderOMNodeId returned through OMResponse does
-   * not match the current leaderOMNodeId cached by the proxy provider.
+   * Selects the next OM Leader to try.
    */
-  public void performFailoverToNextProxy() {
-    int newProxyIndex = incrementProxyIndex();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
-          newProxyIndex, omNodeIDList.get(newProxyIndex));
+  public synchronized void selectNextOmProxy() {
+    if (performFailoverDone) {
+      performFailoverDone = false;
+      int newProxyIndex = incrementNextProxyIndex();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
+            newProxyIndex, omNodeIDList.get(newProxyIndex));
+      }
     }
   }
 
@@ -395,15 +395,15 @@ public class OMFailoverProxyProvider<T> implements
    * Update the proxy index to the next proxy in the list.
    * @return the new proxy index
    */
-  private synchronized int incrementProxyIndex() {
+  private synchronized int incrementNextProxyIndex() {
     // Before failing over to next proxy, add the proxy OM (which has
     // returned an exception) to the list of attemptedOMs.
-    lastAttemptedOM = currentProxyOMNodeId;
-    attemptedOMs.add(currentProxyOMNodeId);
+    lastAttemptedOM = nextProxyOMNodeId;
+    attemptedOMs.add(nextProxyOMNodeId);
 
-    currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
-    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
-    return currentProxyIndex;
+    nextProxyIndex = (nextProxyIndex + 1) % omProxies.size();
+    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+    return nextProxyIndex;
   }
 
   /**
@@ -411,27 +411,31 @@ public class OMFailoverProxyProvider<T> implements
    * @param newLeaderOMNodeId OMNodeId to failover to.
    * @return true if failover is successful, false otherwise.
    */
-  synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
-    if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
+  private synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
+    if (!nextProxyOMNodeId.equals(newLeaderOMNodeId)) {
       if (omProxies.containsKey(newLeaderOMNodeId)) {
-        lastAttemptedOM = currentProxyOMNodeId;
-        currentProxyOMNodeId = newLeaderOMNodeId;
-        currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
+        lastAttemptedOM = nextProxyOMNodeId;
+        nextProxyOMNodeId = newLeaderOMNodeId;
+        nextProxyIndex = omNodeIDList.indexOf(nextProxyOMNodeId);
         return true;
       }
     } else {
-      lastAttemptedOM = currentProxyOMNodeId;
+      lastAttemptedOM = nextProxyOMNodeId;
     }
     return false;
   }
 
-  private synchronized int getCurrentProxyIndex() {
-    return currentProxyIndex;
-  }
-
+  /**
+   * Get the wait time based on
+   * 1. Is the same OM being retried based on response from OM.
+   * 2. Is this a new OM that is being used.
+   * 3. Were all the OMs visited once and retries need to be resumed after a
+   * delay.
+   * @return delay in milliseconds
+   */
   public synchronized long getWaitTime() {
-    if (currentProxyOMNodeId.equals(lastAttemptedOM)) {
-      // Clear attemptedOMs list as round robin has been broken.
+    if (nextProxyOMNodeId.equals(lastAttemptedOM)) {
+      // Clear attemptedOMs list as the same OM has been selected again.
       attemptedOMs.clear();
 
       // The same OM will be contacted again. So wait and then retry.
@@ -441,7 +445,7 @@ public class OMFailoverProxyProvider<T> implements
     // Reset numAttemptsOnSameOM as we failed over to a different OM.
     numAttemptsOnSameOM = 0;
 
-    // OMs are being contacted in round robin way. Check if all the OMs have
+    // OMs are being contacted in Round Robin way. Check if all the OMs have
     // been contacted in this attempt.
     for (String omNodeID : omProxyInfos.keySet()) {
       if (!attemptedOMs.contains(omNodeID)) {
@@ -449,8 +453,8 @@ public class OMFailoverProxyProvider<T> implements
       }
     }
     // This implies all the OMs have been contacted once. Return true and
-    // clear the list as we are going to inject a wait and the next check
-    // should not include these atttempts again.
+    // clear the list. The OMs will be retried in a Round Robin fashion again
+    // after a delay.
     attemptedOMs.clear();
     return waitBetweenRetries;
   }
@@ -461,11 +465,11 @@ public class OMFailoverProxyProvider<T> implements
         unwrappedException instanceof SecretManager.InvalidToken) {
       // Retry all available OMs once before failing with
       // AccessControlException.
-      if (accessControlExceptionOMs.contains(currentProxyOMNodeId)) {
+      if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
         accessControlExceptionOMs.clear();
         return false;
       } else {
-        accessControlExceptionOMs.add(currentProxyOMNodeId);
+        accessControlExceptionOMs.add(nextProxyOMNodeId);
         if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
           return false;
         }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 32d4f54..9a6040c 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -84,7 +84,8 @@ public class Hadoop3OmTransport implements OmTransport {
 
         // Failover to the OM node returned by OMResponse leaderOMNodeId if
         // current proxy is not pointing to that node.
-        omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+        omFailoverProxyProvider.setNextOmProxy(leaderOmId);
+        omFailoverProxyProvider.performFailover(null);
       }
       return omResponse;
     } catch (ServiceException e) {
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index 052ff8f..78505f7 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -123,7 +123,7 @@ public class TestOMFailoverProxyProvider {
     allNodeIds.remove(provider.getCurrentProxyOMNodeId());
     Assert.assertTrue("This test needs at least 2 OMs",
         allNodeIds.size() > 0);
-    provider.performFailoverIfRequired(allNodeIds.iterator().next());
+    provider.setNextOmProxy(allNodeIds.iterator().next());
     Assert.assertEquals(0, provider.getWaitTime());
   }
 
@@ -149,8 +149,9 @@ public class TestOMFailoverProxyProvider {
   private void failoverToNextNode(int numNextNodeFailoverTimes,
       long waitTimeAfter) {
     for (int attempt = 0; attempt < numNextNodeFailoverTimes; attempt++) {
-      provider.performFailoverToNextProxy();
+      provider.selectNextOmProxy();
       Assert.assertEquals(waitTimeAfter, provider.getWaitTime());
+      provider.performFailover(null);
     }
   }
 
@@ -158,8 +159,9 @@ public class TestOMFailoverProxyProvider {
    * Failover to same node and wait time will be attempt*waitBetweenRetries.
    */
   private void failoverToSameNode(int numSameNodeFailoverTimes) {
+    provider.performFailover(null);
     for (int attempt = 1; attempt <= numSameNodeFailoverTimes; attempt++) {
-      provider.performFailoverIfRequired(provider.getCurrentProxyOMNodeId());
+      provider.setNextOmProxy(provider.getCurrentProxyOMNodeId());
       Assert.assertEquals(attempt * waitBetweenRetries,
           provider.getWaitTime());
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index 8df3f87..7610fc1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -234,7 +234,8 @@ public class TestOzoneManagerHAMetadataOnly extends 
TestOzoneManagerHA {
 
     // Perform a manual failover of the proxy provider to move the
     // currentProxyIndex to a node other than the leader OM.
-    omFailoverProxyProvider.performFailoverToNextProxy();
+    omFailoverProxyProvider.selectNextOmProxy();
+    omFailoverProxyProvider.performFailover(null);
 
     String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
     Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
@@ -305,7 +306,7 @@ public class TestOzoneManagerHAMetadataOnly extends 
TestOzoneManagerHA {
           OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
 
       // Failover to the OM node that the objectStore points to
-      omFailoverProxyProvider.performFailoverIfRequired(
+      omFailoverProxyProvider.setNextOmProxy(
           ozoneManager.getOMNodeId());
 
       // A read request should result in the proxyProvider failing over to
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
index 9e35ffe..39f5f4e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
@@ -57,7 +57,7 @@ public class TestOzoneManagerHAWithFailover extends 
TestOzoneManagerHA {
 
     long numTimesTriedToSameNode = omFailoverProxyProvider.getWaitTime()
         / waitBetweenRetries;
-    omFailoverProxyProvider.performFailoverIfRequired(omFailoverProxyProvider.
+    omFailoverProxyProvider.setNextOmProxy(omFailoverProxyProvider.
         getCurrentProxyOMNodeId());
     Assert.assertEquals((numTimesTriedToSameNode + 1) * waitBetweenRetries,
         omFailoverProxyProvider.getWaitTime());
diff --git 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index c0de6d6..a3b678e 100644
--- 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -78,7 +78,8 @@ public class Hadoop27RpcTransport implements OmTransport {
 
         // Failover to the OM node returned by OMResponse leaderOMNodeId if
         // current proxy is not pointing to that node.
-        omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+        omFailoverProxyProvider.setNextOmProxy(leaderOmId);
+        omFailoverProxyProvider.performFailover(null);
       }
       return omResponse;
     } catch (ServiceException e) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to