This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ca24bdb HDDS-5228 Make OM FailOverProxyProvider work across threads
(#3160)
ca24bdb is described below
commit ca24bdb911cb5a20ce5a074c5cf6a07bf4f56ad0
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Mon Mar 21 16:01:25 2022 -0700
HDDS-5228 Make OM FailOverProxyProvider work across threads (#3160)
---
.../ozone/om/ha/OMFailoverProxyProvider.java | 130 +++++++++++----------
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 3 +-
.../ozone/om/ha/TestOMFailoverProxyProvider.java | 8 +-
.../ozone/om/TestOzoneManagerHAMetadataOnly.java | 5 +-
.../ozone/om/TestOzoneManagerHAWithFailover.java | 2 +-
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 3 +-
6 files changed, 80 insertions(+), 71 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5432468..d9bb975 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
@@ -54,7 +53,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@@ -88,7 +86,9 @@ public class OMFailoverProxyProvider<T> implements
private Map<String, OMProxyInfo> omProxyInfos;
private List<String> omNodeIDList;
+ private String nextProxyOMNodeId;
private String currentProxyOMNodeId;
+ private int nextProxyIndex;
private int currentProxyIndex;
private List<String> retryExceptions = new ArrayList<>();
@@ -103,6 +103,7 @@ public class OMFailoverProxyProvider<T> implements
private int numAttemptsOnSameOM = 0;
private final long waitBetweenRetries;
private Set<String> accessControlExceptionOMs = new HashSet<>();
+ private boolean performFailoverDone;
public OMFailoverProxyProvider(ConfigurationSource configuration,
UserGroupInformation ugi, String omServiceId, Class<T> protocol)
@@ -112,11 +113,14 @@ public class OMFailoverProxyProvider<T> implements
this.ugi = ugi;
this.omServiceId = omServiceId;
this.protocolClass = protocol;
+ this.performFailoverDone = true;
loadOMClientConfigs(conf, this.omServiceId);
this.delegationTokenService = computeDelegationTokenService();
+ nextProxyIndex = 0;
+ nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
currentProxyIndex = 0;
- currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+ currentProxyOMNodeId = nextProxyOMNodeId;
waitBetweenRetries = conf.getLong(
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
@@ -261,9 +265,9 @@ public class OMFailoverProxyProvider<T> implements
// address of the suggested leader along with the nodeID.
// Failing over just based on nodeID is not very robust.
- // OMFailoverProxyProvider#performFailover() is a dummy call and
- // does not perform any failover. Failover manually to the next OM.
- performFailoverToNextProxy();
+ // Prepare the next OM to be tried. This will help with calculation
+ // of the wait times needed get creating the retryAction.
+ selectNextOmProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
@@ -273,7 +277,7 @@ public class OMFailoverProxyProvider<T> implements
// Retry on same OM again as leader OM is not ready.
// Failing over to same OM so that wait time between retries is
// incremented
- performFailoverIfRequired(omNodeId);
+ setNextOmProxy(omNodeId);
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
}
@@ -282,9 +286,9 @@ public class OMFailoverProxyProvider<T> implements
return RetryAction.FAIL; // do not retry
}
- // For all other exceptions, fail over manually to the next OM Node
- // proxy.
- performFailoverToNextProxy();
+ // Prepare the next OM to be tried. This will help with calculation
+ // of the wait times needed get creating the retryAction.
+ selectNextOmProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
@@ -339,39 +343,33 @@ public class OMFailoverProxyProvider<T> implements
/**
* Called whenever an error warrants failing over. It is determined by the
- * retry policy.
- *
- * This is a dummy call from {@link RetryInvocationHandler}. The actual
- * failover should be performed using either
- * {@link OMFailoverProxyProvider#performFailoverIfRequired(String)} or
- * {@link OMFailoverProxyProvider#performFailoverToNextProxy()}.
- *
- * In {@link OzoneManagerProtocolClientSideTranslatorPB}, we first
- * manually failover and then call the RetryAction FAILOVER_AND_RETRY. This
- * is done because we do not want to always failover to the next proxy. If we
- * get a OMNotLeaderException with a suggested leader, then we want to
- * failover to that OM proxy instead. Hence, we failover manually and the
- * {@link FailoverProxyProvider#performFailover(Object)} call should not do
- * failover again.
+ * retry policy. This method is supposed to called only once in a
+ * multithreaded environment. This where the failover occurs.
+ * performFailOver updates the currentProxyOmNodeId
+ * When 2 or more threads run in parallel, the
+ * RetryInvocationHandler will check the expectedFailOverCount
+ * and not execute performFailOver() for one of them. So the other thread(s)
+ * shall not call performFailOver(), instead it will call getProxy().
*/
@Override
- public void performFailover(T currentProxy) {
- if (LOG.isDebugEnabled()) {
- int currentIndex = getCurrentProxyIndex();
- LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
- currentIndex, omNodeIDList.get(currentIndex));
- }
+ public synchronized void performFailover(T currentProxy) {
+ LOG.debug("Failing over OM from {}:{} to {}:{}",
+ currentProxyOMNodeId, currentProxyIndex,
+ nextProxyOMNodeId, nextProxyIndex);
+ currentProxyOMNodeId = nextProxyOMNodeId;
+ currentProxyIndex = nextProxyIndex;
+ performFailoverDone = true;
}
/**
- * Performs failover if the leaderOMNodeId returned through OMReponse does
+ * Set the next leaderOMNodeId returned through OMResponse if it does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
- public void performFailoverIfRequired(String newLeaderOMNodeId) {
+ public void setNextOmProxy(String newLeaderOMNodeId) {
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer"
+
" node");
- performFailoverToNextProxy();
+ selectNextOmProxy();
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
@@ -380,14 +378,16 @@ public class OMFailoverProxyProvider<T> implements
}
/**
- * Performs failover if the leaderOMNodeId returned through OMResponse does
- * not match the current leaderOMNodeId cached by the proxy provider.
+ * Selects the next OM Leader to try.
*/
- public void performFailoverToNextProxy() {
- int newProxyIndex = incrementProxyIndex();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
- newProxyIndex, omNodeIDList.get(newProxyIndex));
+ public synchronized void selectNextOmProxy() {
+ if (performFailoverDone) {
+ performFailoverDone = false;
+ int newProxyIndex = incrementNextProxyIndex();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Incrementing OM proxy index to {}, nodeId: {}",
+ newProxyIndex, omNodeIDList.get(newProxyIndex));
+ }
}
}
@@ -395,15 +395,15 @@ public class OMFailoverProxyProvider<T> implements
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
- private synchronized int incrementProxyIndex() {
+ private synchronized int incrementNextProxyIndex() {
// Before failing over to next proxy, add the proxy OM (which has
// returned an exception) to the list of attemptedOMs.
- lastAttemptedOM = currentProxyOMNodeId;
- attemptedOMs.add(currentProxyOMNodeId);
+ lastAttemptedOM = nextProxyOMNodeId;
+ attemptedOMs.add(nextProxyOMNodeId);
- currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
- currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
- return currentProxyIndex;
+ nextProxyIndex = (nextProxyIndex + 1) % omProxies.size();
+ nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+ return nextProxyIndex;
}
/**
@@ -411,27 +411,31 @@ public class OMFailoverProxyProvider<T> implements
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
*/
- synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
- if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
+ private synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
+ if (!nextProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
- lastAttemptedOM = currentProxyOMNodeId;
- currentProxyOMNodeId = newLeaderOMNodeId;
- currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
+ lastAttemptedOM = nextProxyOMNodeId;
+ nextProxyOMNodeId = newLeaderOMNodeId;
+ nextProxyIndex = omNodeIDList.indexOf(nextProxyOMNodeId);
return true;
}
} else {
- lastAttemptedOM = currentProxyOMNodeId;
+ lastAttemptedOM = nextProxyOMNodeId;
}
return false;
}
- private synchronized int getCurrentProxyIndex() {
- return currentProxyIndex;
- }
-
+ /**
+ * Get the wait time based on
+ * 1. Is the same OM being retried based on response from OM.
+ * 2. Is this a new OM that is being used.
+ * 3. Were all the OMs visited once and retries need to be resumed after a
+ * delay.
+ * @return delay in milliseconds
+ */
public synchronized long getWaitTime() {
- if (currentProxyOMNodeId.equals(lastAttemptedOM)) {
- // Clear attemptedOMs list as round robin has been broken.
+ if (nextProxyOMNodeId.equals(lastAttemptedOM)) {
+ // Clear attemptedOMs list as the same OM has been selected again.
attemptedOMs.clear();
// The same OM will be contacted again. So wait and then retry.
@@ -441,7 +445,7 @@ public class OMFailoverProxyProvider<T> implements
// Reset numAttemptsOnSameOM as we failed over to a different OM.
numAttemptsOnSameOM = 0;
- // OMs are being contacted in round robin way. Check if all the OMs have
+ // OMs are being contacted in Round Robin way. Check if all the OMs have
// been contacted in this attempt.
for (String omNodeID : omProxyInfos.keySet()) {
if (!attemptedOMs.contains(omNodeID)) {
@@ -449,8 +453,8 @@ public class OMFailoverProxyProvider<T> implements
}
}
// This implies all the OMs have been contacted once. Return true and
- // clear the list as we are going to inject a wait and the next check
- // should not include these atttempts again.
+ // clear the list. The OMs will be retried in a Round Robin fashion again
+ // after a delay.
attemptedOMs.clear();
return waitBetweenRetries;
}
@@ -461,11 +465,11 @@ public class OMFailoverProxyProvider<T> implements
unwrappedException instanceof SecretManager.InvalidToken) {
// Retry all available OMs once before failing with
// AccessControlException.
- if (accessControlExceptionOMs.contains(currentProxyOMNodeId)) {
+ if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
accessControlExceptionOMs.clear();
return false;
} else {
- accessControlExceptionOMs.add(currentProxyOMNodeId);
+ accessControlExceptionOMs.add(nextProxyOMNodeId);
if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
return false;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 32d4f54..9a6040c 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -84,7 +84,8 @@ public class Hadoop3OmTransport implements OmTransport {
// Failover to the OM node returned by OMResponse leaderOMNodeId if
// current proxy is not pointing to that node.
- omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+ omFailoverProxyProvider.setNextOmProxy(leaderOmId);
+ omFailoverProxyProvider.performFailover(null);
}
return omResponse;
} catch (ServiceException e) {
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index 052ff8f..78505f7 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -123,7 +123,7 @@ public class TestOMFailoverProxyProvider {
allNodeIds.remove(provider.getCurrentProxyOMNodeId());
Assert.assertTrue("This test needs at least 2 OMs",
allNodeIds.size() > 0);
- provider.performFailoverIfRequired(allNodeIds.iterator().next());
+ provider.setNextOmProxy(allNodeIds.iterator().next());
Assert.assertEquals(0, provider.getWaitTime());
}
@@ -149,8 +149,9 @@ public class TestOMFailoverProxyProvider {
private void failoverToNextNode(int numNextNodeFailoverTimes,
long waitTimeAfter) {
for (int attempt = 0; attempt < numNextNodeFailoverTimes; attempt++) {
- provider.performFailoverToNextProxy();
+ provider.selectNextOmProxy();
Assert.assertEquals(waitTimeAfter, provider.getWaitTime());
+ provider.performFailover(null);
}
}
@@ -158,8 +159,9 @@ public class TestOMFailoverProxyProvider {
* Failover to same node and wait time will be attempt*waitBetweenRetries.
*/
private void failoverToSameNode(int numSameNodeFailoverTimes) {
+ provider.performFailover(null);
for (int attempt = 1; attempt <= numSameNodeFailoverTimes; attempt++) {
- provider.performFailoverIfRequired(provider.getCurrentProxyOMNodeId());
+ provider.setNextOmProxy(provider.getCurrentProxyOMNodeId());
Assert.assertEquals(attempt * waitBetweenRetries,
provider.getWaitTime());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index 8df3f87..7610fc1 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -234,7 +234,8 @@ public class TestOzoneManagerHAMetadataOnly extends
TestOzoneManagerHA {
// Perform a manual failover of the proxy provider to move the
// currentProxyIndex to a node other than the leader OM.
- omFailoverProxyProvider.performFailoverToNextProxy();
+ omFailoverProxyProvider.selectNextOmProxy();
+ omFailoverProxyProvider.performFailover(null);
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
@@ -305,7 +306,7 @@ public class TestOzoneManagerHAMetadataOnly extends
TestOzoneManagerHA {
OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
// Failover to the OM node that the objectStore points to
- omFailoverProxyProvider.performFailoverIfRequired(
+ omFailoverProxyProvider.setNextOmProxy(
ozoneManager.getOMNodeId());
// A read request should result in the proxyProvider failing over to
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
index 9e35ffe..39f5f4e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
@@ -57,7 +57,7 @@ public class TestOzoneManagerHAWithFailover extends
TestOzoneManagerHA {
long numTimesTriedToSameNode = omFailoverProxyProvider.getWaitTime()
/ waitBetweenRetries;
- omFailoverProxyProvider.performFailoverIfRequired(omFailoverProxyProvider.
+ omFailoverProxyProvider.setNextOmProxy(omFailoverProxyProvider.
getCurrentProxyOMNodeId());
Assert.assertEquals((numTimesTriedToSameNode + 1) * waitBetweenRetries,
omFailoverProxyProvider.getWaitTime());
diff --git
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index c0de6d6..a3b678e 100644
---
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -78,7 +78,8 @@ public class Hadoop27RpcTransport implements OmTransport {
// Failover to the OM node returned by OMResponse leaderOMNodeId if
// current proxy is not pointing to that node.
- omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+ omFailoverProxyProvider.setNextOmProxy(leaderOmId);
+ omFailoverProxyProvider.performFailover(null);
}
return omResponse;
} catch (ServiceException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]