This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2254abf HDDS-5216. Fix race condition causing SCM failOverProxy which
is causing failover wrongly. (#2247)
2254abf is described below
commit 2254abfa4d8ee3fda24c3d7215b0ddca61632fec
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Mon May 17 15:57:16 2021 +0530
HDDS-5216. Fix race condition causing SCM failOverProxy which is causing
failover wrongly. (#2247)
---
.../SCMBlockLocationFailoverProxyProvider.java | 118 +++++++++++----------
.../SCMContainerLocationFailoverProxyProvider.java | 117 ++++++++++----------
.../SCMSecurityProtocolFailoverProxyProvider.java | 77 +++++++-------
3 files changed, 158 insertions(+), 154 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index 7078740..34f91d6 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -61,21 +61,28 @@ public class SCMBlockLocationFailoverProxyProvider
implements
private Map<String, SCMProxyInfo> scmProxyInfoMap;
private List<String> scmNodeIds;
- private String currentProxySCMNodeId;
- private int currentProxyIndex;
+ // As SCM Client is shared across threads, performFailOver()
+ // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
+ // updated in shouldRetry(). When 2 or more threads run in parallel, the
+ // RetryInvocationHandler will check the expectedFailOverCount
+ // and not execute performFailOver() for one of them. So the other thread(s)
+ // shall not call performFailOver(), it will call getProxy() which uses
+ // currentProxySCMNodeId and returns the proxy.
+ private volatile String currentProxySCMNodeId;
+ private volatile int currentProxyIndex;
private final ConfigurationSource conf;
private final long scmVersion;
private String scmServiceId;
- private String lastAttemptedLeader;
-
private final int maxRetryCount;
private final long retryInterval;
private final UserGroupInformation ugi;
+ private String updatedLeaderNodeID = null;
+
public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
this.conf = conf;
@@ -124,9 +131,6 @@ public class SCMBlockLocationFailoverProxyProvider
implements
SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
scmNodeInfo.getServiceId(), scmNodeInfo.getNodeId(),
scmBlockClientAddress);
- ProxyInfo<ScmBlockLocationProtocolPB> proxy
- = new ProxyInfo<>(null, scmProxyInfo.toString());
- scmProxies.put(scmNodeId, proxy);
scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
}
}
@@ -145,17 +149,25 @@ public class SCMBlockLocationFailoverProxyProvider
implements
}
@Override
- public synchronized ProxyInfo getProxy() {
- ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
- createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+ public synchronized ProxyInfo<ScmBlockLocationProtocolPB> getProxy() {
+ String currentProxyNodeId = getCurrentProxySCMNodeId();
+ ProxyInfo currentProxyInfo = scmProxies.get(currentProxyNodeId);
+ if (currentProxyInfo == null) {
+ currentProxyInfo = createSCMProxy(currentProxyNodeId);
+ }
return currentProxyInfo;
}
@Override
public synchronized void performFailover(
ScmBlockLocationProtocolPB newLeader) {
- // Should do nothing here.
- LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
+ //If leader node id is set, use that or else move to next proxy index.
+ if (updatedLeaderNodeID != null) {
+ currentProxySCMNodeId = updatedLeaderNodeID;
+ } else {
+ nextProxyIndex();
+ }
+
}
public synchronized void performFailoverToAssignedLeader(String newLeader,
@@ -177,17 +189,7 @@ public class SCMBlockLocationFailoverProxyProvider
implements
Arrays.toString(scmProxyInfoMap.values().toArray()));
}
}
- if (newLeader == null) {
- // If newLeader is not assigned, it will fail over to next proxy.
- nextProxyIndex();
- LOG.debug("Performing failover to next proxy node {}",
- currentProxySCMNodeId);
- } else {
- if (!assignLeaderToNode(newLeader)) {
- LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
- nextProxyIndex();
- }
- }
+ assignLeaderToNode(newLeader);
}
@Override
@@ -210,49 +212,41 @@ public class SCMBlockLocationFailoverProxyProvider
implements
return retryInterval;
}
- private synchronized int nextProxyIndex() {
- lastAttemptedLeader = currentProxySCMNodeId;
-
+ private synchronized void nextProxyIndex() {
// round robin the next proxy
- currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+
+ currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- return currentProxyIndex;
}
- private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
+ private synchronized void assignLeaderToNode(String newLeaderNodeId) {
if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
- if (scmProxies.containsKey(newLeaderNodeId)) {
- lastAttemptedLeader = currentProxySCMNodeId;
- currentProxySCMNodeId = newLeaderNodeId;
- currentProxyIndex = scmNodeIds.indexOf(currentProxySCMNodeId);
- return true;
+ if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
+ updatedLeaderNodeID = newLeaderNodeId;
+ LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+ } else {
+ updatedLeaderNodeID = null;
}
- } else {
- lastAttemptedLeader = currentProxySCMNodeId;
}
- return false;
}
/**
- * Creates proxy object if it does not already exist.
+ * Creates proxy object.
*/
- private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
- String nodeId) {
- if (proxyInfo.proxy == null) {
- InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
- try {
- ScmBlockLocationProtocolPB proxy = createSCMProxy(address);
- try {
- proxyInfo.proxy = proxy;
- } catch (IllegalAccessError iae) {
- scmProxies.put(nodeId,
- new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
- }
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
+ private ProxyInfo createSCMProxy(String nodeId) {
+ ProxyInfo proxyInfo;
+ SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+ InetSocketAddress address = scmProxyInfo.getAddress();
+ try {
+ ScmBlockLocationProtocolPB scmProxy = createSCMProxy(address);
+ // Create proxyInfo here, to make it work with all Hadoop versions.
+ proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
+ scmProxies.put(nodeId, proxyInfo);
+ return proxyInfo;
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to SCM at {}",
+ this.getClass().getSimpleName(), address, ioe);
+ throw new RuntimeException(ioe);
}
}
@@ -279,8 +273,10 @@ public class SCMBlockLocationFailoverProxyProvider
implements
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
- if (!SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
- performFailoverToAssignedLeader(newLeader, e);
+ if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
+ setUpdatedLeaderNodeID();
+ } else {
+ performFailoverToAssignedLeader(null, e);
}
return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount,
getRetryInterval());
@@ -288,5 +284,13 @@ public class SCMBlockLocationFailoverProxyProvider
implements
};
return retryPolicy;
}
+
+ public synchronized int getCurrentProxyIndex() {
+ return currentProxyIndex;
+ }
+
+ public synchronized void setUpdatedLeaderNodeID() {
+ this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
index 2af548e..5a1126d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
@@ -63,8 +63,15 @@ public class SCMContainerLocationFailoverProxyProvider
implements
private final Map<String, SCMProxyInfo> scmProxyInfoMap;
private List<String> scmNodeIds;
- private String currentProxySCMNodeId;
- private int currentProxyIndex;
+ // As SCM Client is shared across threads, performFailOver()
+ // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
+ // updated in shouldRetry(). When 2 or more threads run in parallel, the
+ // RetryInvocationHandler will check the expectedFailOverCount
+ // and not execute performFailOver() for one of them. So the other thread(s)
+ // shall not call performFailOver(), it will call getProxy() which uses
+ // currentProxySCMNodeId and returns the proxy.
+ private volatile String currentProxySCMNodeId;
+ private volatile int currentProxyIndex;
private final ConfigurationSource conf;
private final SCMClientConfig scmClientConfig;
@@ -77,6 +84,8 @@ public class SCMContainerLocationFailoverProxyProvider
implements
private final UserGroupInformation ugi;
+ private String updatedLeaderNodeID = null;
+
/**
* Construct SCMContainerLocationFailoverProxyProvider.
* If userGroupInformation is not null, use the passed ugi, else obtain
@@ -132,9 +141,6 @@ public class SCMContainerLocationFailoverProxyProvider
implements
scmNodeIds.add(scmNodeId);
SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
scmClientAddress);
- ProxyInfo< StorageContainerLocationProtocolPB > proxy
- = new ProxyInfo<>(null, scmProxyInfo.toString());
- scmProxies.put(scmNodeId, proxy);
scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
}
}
@@ -154,24 +160,32 @@ public class SCMContainerLocationFailoverProxyProvider
implements
@Override
public synchronized ProxyInfo<StorageContainerLocationProtocolPB> getProxy()
{
- ProxyInfo<StorageContainerLocationProtocolPB> currentProxyInfo
- = scmProxies.get(currentProxySCMNodeId);
- createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+ ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+ if (currentProxyInfo == null) {
+ currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
+ }
return currentProxyInfo;
}
public synchronized List<StorageContainerLocationProtocolPB> getProxies() {
- scmProxies.forEach(
- (nodeId, proxyInfo) -> createSCMProxyIfNeeded(proxyInfo, nodeId));
-
+ for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) {
+ if (scmProxies.get(scmProxyInfo.getNodeId()) == null) {
+ scmProxies.put(scmProxyInfo.getNodeId(),
+ createSCMProxy(scmProxyInfo.getNodeId()));
+ }
+ }
return scmProxies.values().stream()
.map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList());
}
@Override
- public void performFailover(
+ public synchronized void performFailover(
StorageContainerLocationProtocolPB newLeader) {
- // Should do nothing here.
+ if (updatedLeaderNodeID != null) {
+ currentProxySCMNodeId = updatedLeaderNodeID;
+ } else {
+ nextProxyIndex();
+ }
LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
}
@@ -194,17 +208,7 @@ public class SCMContainerLocationFailoverProxyProvider
implements
Arrays.toString(scmProxyInfoMap.values().toArray()));
}
}
- if (newLeader == null) {
- // If newLeader is not assigned, it will fail over to next proxy.
- nextProxyIndex();
- LOG.debug("Performing failover to next proxy node {}",
- currentProxySCMNodeId);
- } else {
- if (!assignLeaderToNode(newLeader)) {
- LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
- nextProxyIndex();
- }
- }
+ assignLeaderToNode(newLeader);
}
@Override
@@ -229,49 +233,44 @@ public class SCMContainerLocationFailoverProxyProvider
implements
return retryInterval;
}
- private synchronized int nextProxyIndex() {
+ private synchronized void nextProxyIndex() {
// round robin the next proxy
- currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+ currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size();
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- return currentProxyIndex;
}
- private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)
- && scmProxies.containsKey(newLeaderNodeId)) {
- currentProxySCMNodeId = newLeaderNodeId;
- currentProxyIndex = scmNodeIds.indexOf(currentProxySCMNodeId);
-
- LOG.debug("Failing over SCM proxy to nodeId: {}", newLeaderNodeId);
- return true;
+ private synchronized void assignLeaderToNode(String newLeaderNodeId) {
+ if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+ if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
+ updatedLeaderNodeID = newLeaderNodeId;
+ LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+ } else {
+ updatedLeaderNodeID = null;
+ }
}
- return false;
}
/**
- * Creates proxy object if it does not already exist.
+ * Creates proxy object.
*/
- private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
- String nodeId) {
- if (proxyInfo.proxy == null) {
- InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
- try {
- StorageContainerLocationProtocolPB proxy =
- createSCMProxy(address);
- try {
- proxyInfo.proxy = proxy;
- } catch (IllegalAccessError iae) {
- scmProxies.put(nodeId,
- new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
- }
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
+ private ProxyInfo createSCMProxy(String nodeId) {
+ ProxyInfo proxyInfo;
+ SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+ InetSocketAddress address = scmProxyInfo.getAddress();
+ try {
+ StorageContainerLocationProtocolPB scmProxy = createSCMProxy(address);
+ // Create proxyInfo here, to make it work with all Hadoop versions.
+ proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
+ scmProxies.put(nodeId, proxyInfo);
+ return proxyInfo;
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to SCM at {}",
+ this.getClass().getSimpleName(), address, ioe);
+ throw new RuntimeException(ioe);
}
}
+
private StorageContainerLocationProtocolPB createSCMProxy(
InetSocketAddress scmAddress) throws IOException {
Configuration hadoopConf =
@@ -295,7 +294,9 @@ public class SCMContainerLocationFailoverProxyProvider
implements
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
- if (!SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
+ if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
+ setUpdatedLeaderNodeID();
+ } else {
performFailoverToAssignedLeader(null, e);
}
return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount,
@@ -303,4 +304,8 @@ public class SCMContainerLocationFailoverProxyProvider
implements
}
};
}
+
+ public synchronized void setUpdatedLeaderNodeID() {
+ this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
index 2d4dcfa..bf4eb87 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -64,8 +64,16 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
private List<String> scmNodeIds;
- private String currentProxySCMNodeId;
- private int currentProxyIndex;
+ // As SCM Client is shared across threads, performFailOver()
+ // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
+ // updated in shouldRetry(). When 2 or more threads run in parallel, the
+ // RetryInvocationHandler will check the expectedFailOverCount
+ // and not execute performFailOver() for one of them. So the other thread(s)
+ // shall not call performFailOver(), it will call getProxy() which uses
+ // currentProxySCMNodeId and returns the proxy.
+ private volatile String currentProxySCMNodeId;
+ private volatile int currentProxyIndex;
+
private final ConfigurationSource conf;
private final SCMClientConfig scmClientConfig;
@@ -78,6 +86,8 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
private final UserGroupInformation ugi;
+ private String updatedLeaderNodeID = null;
+
/**
* Construct fail-over proxy provider for SCMSecurityProtocol Server.
* @param conf
@@ -176,11 +186,12 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
@Override
public synchronized void performFailover(SCMSecurityProtocolPB currentProxy)
{
- if (LOG.isDebugEnabled()) {
- int currentIndex = getCurrentProxyIndex();
- LOG.debug("Failing over SCM Security proxy to index: {}, nodeId: {}",
- currentIndex, scmNodeIds.get(currentIndex));
+ if (updatedLeaderNodeID != null) {
+ currentProxySCMNodeId = updatedLeaderNodeID;
+ } else {
+ nextProxyIndex();
}
+ LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
}
public synchronized void performFailoverToAssignedLeader(String newLeader,
@@ -202,39 +213,18 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
Arrays.toString(scmProxyInfoMap.values().toArray()));
}
}
- if (newLeader == null) {
- // If newLeader is not assigned, it will fail over to next proxy.
- performFailoverToNextProxy();
- LOG.debug("Performing failover to next proxy node {}",
- currentProxySCMNodeId);
- } else {
- if (!assignLeaderToNode(newLeader)) {
- LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
- performFailoverToNextProxy();
- }
- }
+ assignLeaderToNode(newLeader);
}
- private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)
- && scmProxies.containsKey(newLeaderNodeId)) {
- currentProxySCMNodeId = newLeaderNodeId;
- currentProxyIndex = scmNodeIds.indexOf(currentProxySCMNodeId);
-
- LOG.debug("Failing over SCM proxy to nodeId: {}", newLeaderNodeId);
- return true;
- }
- return false;
- }
- /**
- * Performs fail-over to the next proxy.
- */
- public synchronized void performFailoverToNextProxy() {
- int newProxyIndex = incrementProxyIndex();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing SCM Security proxy index to {}, nodeId: {}",
- newProxyIndex, scmNodeIds.get(newProxyIndex));
+ private synchronized void assignLeaderToNode(String newLeaderNodeId) {
+ if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+ if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
+ updatedLeaderNodeID = newLeaderNodeId;
+ LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+ } else {
+ updatedLeaderNodeID = null;
+ }
}
}
@@ -242,10 +232,10 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
- private synchronized int incrementProxyIndex() {
- currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size();
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- return currentProxyIndex;
+ private synchronized void nextProxyIndex() {
+ // round robin the next proxy
+ currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
+ currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
}
public RetryPolicy getRetryPolicy() {
@@ -269,7 +259,9 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
}
}
- if (!SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
+ if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
+ setUpdatedLeaderNodeID();
+ } else {
performFailoverToAssignedLeader(null, exception);
}
return SCMHAUtils
@@ -281,6 +273,9 @@ public class SCMSecurityProtocolFailoverProxyProvider
implements
return retryPolicy;
}
+ public synchronized void setUpdatedLeaderNodeID() {
+ this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+ }
@Override
public Class< SCMSecurityProtocolPB > getInterface() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]