This is an automated email from the ASF dual-hosted git repository.
ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1a0c2238fa HDDS-11768. Extract SCM failover proxy provider logic
(#7950)
1a0c2238fa is described below
commit 1a0c2238fa4c33332c9bf31235114291e3a4fe64
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Feb 23 17:08:14 2025 +0800
HDDS-11768. Extract SCM failover proxy provider logic (#7950)
---
.../org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java | 14 +-
...lockLocationProtocolClientSideTranslatorPB.java | 2 +-
.../SCMBlockLocationFailoverProxyProvider.java | 265 +-------------------
.../SCMContainerLocationFailoverProxyProvider.java | 269 +--------------------
...ider.java => SCMFailoverProxyProviderBase.java} | 157 +++++++-----
.../SCMSecurityProtocolFailoverProxyProvider.java | 267 +-------------------
.../SecretKeyProtocolFailoverProxyProvider.java | 261 +-------------------
.../SingleSecretKeyProtocolProxyProvider.java | 11 +
8 files changed, 140 insertions(+), 1106 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
index 44289f35ff..d797bd0a11 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
@@ -40,6 +40,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
+import net.jcip.annotations.Immutable;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.ha.ConfUtils;
@@ -52,15 +53,16 @@
* This class is used by SCM clients like OzoneManager, Client, Admin
* commands to figure out SCM Node Information to make contact to SCM.
*/
+@Immutable
public class SCMNodeInfo {
private static final Logger LOG = LoggerFactory.getLogger(SCMNodeInfo.class);
- private String serviceId;
- private String nodeId;
- private String blockClientAddress;
- private String scmClientAddress;
- private String scmSecurityAddress;
- private String scmDatanodeAddress;
+ private final String serviceId;
+ private final String nodeId;
+ private final String blockClientAddress;
+ private final String scmClientAddress;
+ private final String scmSecurityAddress;
+ private final String scmDatanodeAddress;
/**
* Build SCM Node information from configuration.
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 00e8e3e0ad..13a8846f04 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -98,7 +98,7 @@ public ScmBlockLocationProtocolClientSideTranslatorPB(
this.failoverProxyProvider = proxyProvider;
this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
ScmBlockLocationProtocolPB.class, failoverProxyProvider,
- failoverProxyProvider.getSCMBlockLocationRetryPolicy());
+ failoverProxyProvider.getRetryPolicy());
}
/**
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index 70c31d05b2..60e6999759 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -17,283 +17,32 @@
package org.apache.hadoop.hdds.scm.proxy;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Failover proxy provider for SCM block location.
*/
-public class SCMBlockLocationFailoverProxyProvider implements
- FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+public class SCMBlockLocationFailoverProxyProvider extends
+ SCMFailoverProxyProviderBase<ScmBlockLocationProtocolPB> {
public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
- private final SCMClientConfig scmClientConfig;
-
- private final Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
- private final Map<String, SCMProxyInfo> scmProxyInfoMap;
- private List<String> scmNodeIds;
-
- // As SCM Client is shared across threads, performFailOver()
- // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
- // updated in shouldRetry(). When 2 or more threads run in parallel, the
- // RetryInvocationHandler will check the expectedFailOverCount
- // and not execute performFailOver() for one of them. So the other thread(s)
- // shall not call performFailOver(), it will call getProxy() which uses
- // currentProxySCMNodeId and returns the proxy.
- private volatile String currentProxySCMNodeId;
- private volatile int currentProxyIndex;
-
- private final ConfigurationSource conf;
- private final long scmVersion;
-
- private String scmServiceId;
-
- private final int maxRetryCount;
- private final long retryInterval;
-
- private final UserGroupInformation ugi;
-
- private String updatedLeaderNodeID = null;
-
public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
- this.conf = conf;
- this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-
- try {
- this.ugi = UserGroupInformation.getCurrentUser();
- } catch (IOException ex) {
- LOG.error("Unable to fetch user credentials from UGI", ex);
- throw new RuntimeException(ex);
- }
-
- // Set some constant for non-HA.
- if (scmServiceId == null) {
- scmServiceId = SCM_DUMMY_SERVICE_ID;
- }
- this.scmProxies = new HashMap<>();
- this.scmProxyInfoMap = new HashMap<>();
-
- loadConfigs();
-
- this.currentProxyIndex = 0;
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
-
- scmClientConfig = conf.getObject(SCMClientConfig.class);
- this.maxRetryCount = scmClientConfig.getRetryCount();
- this.retryInterval = scmClientConfig.getRetryInterval();
-
- LOG.info("Created block location fail-over proxy with {} nodes: {}",
- scmNodeIds.size(), scmProxyInfoMap.values());
- }
-
- private synchronized void loadConfigs() {
-
- scmNodeIds = new ArrayList<>();
- List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
-
- for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- if (scmNodeInfo.getBlockClientAddress() == null) {
- throw new ConfigurationException("SCM BlockClient Address could not " +
- "be obtained from config. Config is not properly defined");
- } else {
- InetSocketAddress scmBlockClientAddress =
- NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress());
-
- scmServiceId = scmNodeInfo.getServiceId();
- String scmNodeId = scmNodeInfo.getNodeId();
- scmNodeIds.add(scmNodeId);
- SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
- scmNodeInfo.getServiceId(), scmNodeInfo.getNodeId(),
- scmBlockClientAddress);
- scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
- }
- }
- }
-
- @VisibleForTesting
- public synchronized void changeCurrentProxy(String nodeId) {
- currentProxyIndex = scmNodeIds.indexOf(nodeId);
- currentProxySCMNodeId = nodeId;
- nextProxyIndex();
- }
-
- private synchronized String getCurrentProxySCMNodeId() {
- return currentProxySCMNodeId;
- }
-
- @Override
- public synchronized ProxyInfo<ScmBlockLocationProtocolPB> getProxy() {
- String currentProxyNodeId = getCurrentProxySCMNodeId();
- ProxyInfo<ScmBlockLocationProtocolPB> currentProxyInfo =
- scmProxies.get(currentProxyNodeId);
- if (currentProxyInfo == null) {
- currentProxyInfo = createSCMProxy(currentProxyNodeId);
- }
- return currentProxyInfo;
+ super(ScmBlockLocationProtocolPB.class, conf, null);
}
@Override
- public synchronized void performFailover(
- ScmBlockLocationProtocolPB newLeader) {
- //If leader node id is set, use that or else move to next proxy index.
- if (updatedLeaderNodeID != null) {
- currentProxySCMNodeId = updatedLeaderNodeID;
- } else {
- nextProxyIndex();
- }
-
- }
-
- public synchronized void performFailoverToAssignedLeader(String newLeader,
- Exception e) {
- ServerNotLeaderException snle =
- (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
- if (snle != null && snle.getSuggestedLeader() != null) {
- Optional<SCMProxyInfo> matchedProxyInfo =
- scmProxyInfoMap.values().stream().filter(
- proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
- .equals(snle.getSuggestedLeader())).findFirst();
- if (matchedProxyInfo.isPresent()) {
- newLeader = matchedProxyInfo.get().getNodeId();
- LOG.debug("Performing failover to suggested leader {}, nodeId {}",
- snle.getSuggestedLeader(), newLeader);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Suggested leader {} does not match with any of the " +
- "proxyInfo adress {}", snle.getSuggestedLeader(),
- Arrays.toString(scmProxyInfoMap.values().toArray()));
- }
- }
- }
- assignLeaderToNode(newLeader);
+ protected Logger getLogger() {
+ return LOG;
}
@Override
- public Class<ScmBlockLocationProtocolPB> getInterface() {
- return ScmBlockLocationProtocolPB.class;
- }
-
- @Override
- public synchronized void close() throws IOException {
- for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) {
- ScmBlockLocationProtocolPB scmProxy = proxy.proxy;
- if (scmProxy != null) {
- RPC.stopProxy(scmProxy);
- }
- }
- }
-
- private synchronized long getRetryInterval() {
- // TODO add exponential backup
- return retryInterval;
- }
-
- private synchronized void nextProxyIndex() {
- // round robin the next proxy
-
- currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- }
-
- private synchronized void assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
- if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
- updatedLeaderNodeID = newLeaderNodeId;
- LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
- } else {
- updatedLeaderNodeID = null;
- }
- }
- }
-
- /**
- * Creates proxy object.
- */
- private ProxyInfo<ScmBlockLocationProtocolPB> createSCMProxy(String nodeId) {
- ProxyInfo<ScmBlockLocationProtocolPB> proxyInfo;
- SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
- InetSocketAddress address = scmProxyInfo.getAddress();
- try {
- ScmBlockLocationProtocolPB scmProxy = createSCMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
- scmProxies.put(nodeId, proxyInfo);
- return proxyInfo;
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
- }
-
- private ScmBlockLocationProtocolPB createSCMProxy(
- InetSocketAddress scmAddress) throws IOException {
- Configuration hadoopConf =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class,
- ProtobufRpcEngine.class);
- // FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same OM in case of connection exception. This retry
- // policy essentially results in TRY_ONCE_THEN_FAIL.
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
- return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion,
- scmAddress, ugi, hadoopConf,
- NetUtils.getDefaultSocketFactory(hadoopConf),
- (int) scmClientConfig.getRpcTimeOut(),
- connectionRetryPolicy).getProxy();
- }
-
- public RetryPolicy getSCMBlockLocationRetryPolicy() {
- return new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception e, int retry,
- int failover, boolean b) {
- if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
- setUpdatedLeaderNodeID();
- } else {
- performFailoverToAssignedLeader(null, e);
- }
- return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount,
- getRetryInterval());
- }
- };
- }
-
- public synchronized int getCurrentProxyIndex() {
- return currentProxyIndex;
- }
-
- public synchronized void setUpdatedLeaderNodeID() {
- this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+ protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) {
+ return scmNodeInfo.getBlockClientAddress();
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
index 6e1d25bb0f..d0210128f0 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
@@ -17,31 +17,9 @@
package org.apache.hadoop.hdds.scm.proxy;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,261 +27,28 @@
/**
* Failover proxy provider for StorageContainerLocationProtocolPB.
*/
-public class SCMContainerLocationFailoverProxyProvider implements
- FailoverProxyProvider<StorageContainerLocationProtocolPB>, Closeable {
+public class SCMContainerLocationFailoverProxyProvider extends
+ SCMFailoverProxyProviderBase<StorageContainerLocationProtocolPB> {
public static final Logger LOG =
LoggerFactory.getLogger(SCMContainerLocationFailoverProxyProvider.class);
- // scmNodeId -> ProxyInfo<rpcProxy>
- private final Map<String,
- ProxyInfo<StorageContainerLocationProtocolPB>> scmProxies;
- // scmNodeId -> SCMProxyInfo
- private final Map<String, SCMProxyInfo> scmProxyInfoMap;
- private List<String> scmNodeIds;
-
- // As SCM Client is shared across threads, performFailOver()
- // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
- // updated in shouldRetry(). When 2 or more threads run in parallel, the
- // RetryInvocationHandler will check the expectedFailOverCount
- // and not execute performFailOver() for one of them. So the other thread(s)
- // shall not call performFailOver(), it will call getProxy() which uses
- // currentProxySCMNodeId and returns the proxy.
- private volatile String currentProxySCMNodeId;
- private volatile int currentProxyIndex;
-
- private final ConfigurationSource conf;
- private final SCMClientConfig scmClientConfig;
- private final long scmVersion;
-
- private String scmServiceId;
-
- private final int maxRetryCount;
- private final long retryInterval;
-
- private final UserGroupInformation ugi;
-
- private String updatedLeaderNodeID = null;
-
/**
* Construct SCMContainerLocationFailoverProxyProvider.
* If userGroupInformation is not null, use the passed ugi, else obtain
* from {@link UserGroupInformation#getCurrentUser()}
- * @param conf
- * @param userGroupInformation
*/
public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf,
UserGroupInformation userGroupInformation) {
- this.conf = conf;
-
- if (userGroupInformation == null) {
- try {
- this.ugi = UserGroupInformation.getCurrentUser();
- } catch (IOException ex) {
- LOG.error("Unable to fetch user credentials from UGI", ex);
- throw new RuntimeException(ex);
- }
- } else {
- this.ugi = userGroupInformation;
- }
- this.scmVersion = RPC.getProtocolVersion(
- StorageContainerLocationProtocolPB.class);
-
- this.scmProxies = new HashMap<>();
- this.scmProxyInfoMap = new HashMap<>();
- loadConfigs();
-
- this.currentProxyIndex = 0;
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- scmClientConfig = conf.getObject(SCMClientConfig.class);
- this.maxRetryCount = scmClientConfig.getRetryCount();
- this.retryInterval = scmClientConfig.getRetryInterval();
- }
-
- @VisibleForTesting
- protected synchronized void loadConfigs() {
- List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
-
- scmNodeIds = new ArrayList<>();
-
- for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- if (scmNodeInfo.getScmClientAddress() == null) {
- throw new ConfigurationException("SCM Client Address could not " +
- "be obtained from config. Config is not properly defined");
- } else {
- InetSocketAddress scmClientAddress =
- NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress());
-
- scmServiceId = scmNodeInfo.getServiceId();
- String scmNodeId = scmNodeInfo.getNodeId();
-
- scmNodeIds.add(scmNodeId);
- SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
- scmClientAddress);
- scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
- }
- }
- }
-
- @VisibleForTesting
- public synchronized String getCurrentProxySCMNodeId() {
- return currentProxySCMNodeId;
- }
-
- @VisibleForTesting
- public synchronized void changeCurrentProxy(String nodeId) {
- currentProxyIndex = scmNodeIds.indexOf(nodeId);
- currentProxySCMNodeId = nodeId;
- nextProxyIndex();
+ super(StorageContainerLocationProtocolPB.class, conf,
userGroupInformation);
}
@Override
- public synchronized ProxyInfo<StorageContainerLocationProtocolPB> getProxy()
{
- ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
- if (currentProxyInfo == null) {
- currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
- }
- return currentProxyInfo;
- }
-
- public synchronized List<StorageContainerLocationProtocolPB> getProxies() {
- for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) {
- if (scmProxies.get(scmProxyInfo.getNodeId()) == null) {
- scmProxies.put(scmProxyInfo.getNodeId(),
- createSCMProxy(scmProxyInfo.getNodeId()));
- }
- }
- return scmProxies.values().stream()
- .map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList());
+ protected Logger getLogger() {
+ return LOG;
}
@Override
- public synchronized void performFailover(
- StorageContainerLocationProtocolPB newLeader) {
- if (updatedLeaderNodeID != null) {
- currentProxySCMNodeId = updatedLeaderNodeID;
- } else {
- nextProxyIndex();
- }
- LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
- }
-
- public synchronized void performFailoverToAssignedLeader(String newLeader,
- Exception e) {
- ServerNotLeaderException snle =
- (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
- if (snle != null && snle.getSuggestedLeader() != null) {
- Optional<SCMProxyInfo> matchedProxyInfo =
- scmProxyInfoMap.values().stream().filter(
- proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
- .equals(snle.getSuggestedLeader())).findFirst();
- if (matchedProxyInfo.isPresent()) {
- newLeader = matchedProxyInfo.get().getNodeId();
- LOG.debug("Performing failover to suggested leader {}, nodeId {}",
- snle.getSuggestedLeader(), newLeader);
- } else {
- LOG.debug("Suggested leader {} does not match with any of the " +
- "proxyInfo adress {}", snle.getSuggestedLeader(),
- Arrays.toString(scmProxyInfoMap.values().toArray()));
- }
- }
- assignLeaderToNode(newLeader);
- }
-
- @Override
- public Class<StorageContainerLocationProtocolPB> getInterface() {
- return StorageContainerLocationProtocolPB.class;
- }
-
- @Override
- public synchronized void close() throws IOException {
- for (ProxyInfo<StorageContainerLocationProtocolPB>
- proxy : scmProxies.values()) {
- StorageContainerLocationProtocolPB scmProxy =
- proxy.proxy;
- if (scmProxy != null) {
- RPC.stopProxy(scmProxy);
- }
- }
- }
-
- private long getRetryInterval() {
- // TODO add exponential backup
- return retryInterval;
- }
-
- private synchronized void nextProxyIndex() {
- // round robin the next proxy
- currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size();
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- }
-
- private synchronized void assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
- if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
- updatedLeaderNodeID = newLeaderNodeId;
- LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
- } else {
- updatedLeaderNodeID = null;
- }
- }
- }
-
- /**
- * Creates proxy object.
- */
- private ProxyInfo createSCMProxy(String nodeId) {
- ProxyInfo proxyInfo;
- SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
- InetSocketAddress address = scmProxyInfo.getAddress();
- try {
- StorageContainerLocationProtocolPB scmProxy = createSCMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
- scmProxies.put(nodeId, proxyInfo);
- return proxyInfo;
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
- }
-
-
- private StorageContainerLocationProtocolPB createSCMProxy(
- InetSocketAddress scmAddress) throws IOException {
- Configuration hadoopConf =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class,
- ProtobufRpcEngine.class);
- // FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same OM in case of connection exception. This retry
- // policy essentially results in TRY_ONCE_THEN_FAIL.
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
- return RPC.getProtocolProxy(
- StorageContainerLocationProtocolPB.class,
- scmVersion, scmAddress, ugi,
- hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
- (int)scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
- }
-
- public RetryPolicy getRetryPolicy() {
- return new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception e, int retry,
- int failover, boolean b) {
- if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
- setUpdatedLeaderNodeID();
- } else {
- performFailoverToAssignedLeader(null, e);
- }
- return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount,
- getRetryInterval());
- }
- };
- }
-
- public synchronized void setUpdatedLeaderNodeID() {
- this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+ protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) {
+ return scmNodeInfo.getScmClientAddress();
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java
similarity index 66%
copy from
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
copy to
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java
index 6e1d25bb0f..504504e597 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java
@@ -18,11 +18,12 @@
package org.apache.hadoop.hdds.scm.proxy;
import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,6 @@
import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
-import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -44,19 +44,22 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Failover proxy provider for StorageContainerLocationProtocolPB.
+ * A failover proxy provider base abstract class.
+ * Provides common methods for failover proxy provider
+ * implementations. Failover proxy provider allows clients to configure
+ * multiple SCMs to connect to. In case of SCM failover, client can try
+ * connecting to another SCM node from the list of proxies.
*/
-public class SCMContainerLocationFailoverProxyProvider implements
- FailoverProxyProvider<StorageContainerLocationProtocolPB>, Closeable {
- public static final Logger LOG =
- LoggerFactory.getLogger(SCMContainerLocationFailoverProxyProvider.class);
+public abstract class SCMFailoverProxyProviderBase<T> implements
FailoverProxyProvider<T> {
+
+ private final SCMClientConfig scmClientConfig;
+
+ private final Class<T> protocolClass;
// scmNodeId -> ProxyInfo<rpcProxy>
- private final Map<String,
- ProxyInfo<StorageContainerLocationProtocolPB>> scmProxies;
+ private final Map<String, ProxyInfo<T>> scmProxies;
// scmNodeId -> SCMProxyInfo
private final Map<String, SCMProxyInfo> scmProxyInfoMap;
private List<String> scmNodeIds;
@@ -72,11 +75,8 @@ public class SCMContainerLocationFailoverProxyProvider
implements
private volatile int currentProxyIndex;
private final ConfigurationSource conf;
- private final SCMClientConfig scmClientConfig;
private final long scmVersion;
- private String scmServiceId;
-
private final int maxRetryCount;
private final long retryInterval;
@@ -85,28 +85,26 @@ public class SCMContainerLocationFailoverProxyProvider
implements
private String updatedLeaderNodeID = null;
/**
- * Construct SCMContainerLocationFailoverProxyProvider.
+ * Construct SCMFailoverProxyProviderBase.
* If userGroupInformation is not null, use the passed ugi, else obtain
* from {@link UserGroupInformation#getCurrentUser()}
- * @param conf
- * @param userGroupInformation
*/
- public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf,
+ public SCMFailoverProxyProviderBase(Class<T> protocol, ConfigurationSource
conf,
UserGroupInformation userGroupInformation) {
+ this.protocolClass = protocol;
this.conf = conf;
if (userGroupInformation == null) {
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException ex) {
- LOG.error("Unable to fetch user credentials from UGI", ex);
+ getLogger().error("Unable to fetch user credentials from UGI", ex);
throw new RuntimeException(ex);
}
} else {
this.ugi = userGroupInformation;
}
- this.scmVersion = RPC.getProtocolVersion(
- StorageContainerLocationProtocolPB.class);
+ this.scmVersion = RPC.getProtocolVersion(protocol);
this.scmProxies = new HashMap<>();
this.scmProxyInfoMap = new HashMap<>();
@@ -114,40 +112,60 @@ public
SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf,
this.currentProxyIndex = 0;
currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+
scmClientConfig = conf.getObject(SCMClientConfig.class);
this.maxRetryCount = scmClientConfig.getRetryCount();
this.retryInterval = scmClientConfig.getRetryInterval();
+
+ getLogger().info("Created fail-over proxy for protocol {} with {} nodes:
{}", protocol.getSimpleName(),
+ scmNodeIds.size(), scmProxyInfoMap.values());
+ }
+
+ /**
+ * Get the logger implementation for the specific protocol's failover proxy
provider.
+ */
+ protected abstract Logger getLogger();
+
+ /**
+ * Get the specific protocol address from {@link SCMNodeInfo}.
+ * @param scmNodeInfo SCM node info which contains different protocols'
address.
+ * @return protocol address.
+ */
+ protected abstract String getProtocolAddress(SCMNodeInfo scmNodeInfo);
+
+ /**
+ * Get the SCM node ID the current proxy is pointing to.
+ * This can be overridden with a single SCM node ID to disable SCM failover.
+ * See {@link SingleSecretKeyProtocolProxyProvider}
+ * @return current proxy's SCM Node ID.
+ */
+ protected synchronized String getCurrentProxySCMNodeId() {
+ return currentProxySCMNodeId;
}
@VisibleForTesting
protected synchronized void loadConfigs() {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
-
scmNodeIds = new ArrayList<>();
+
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- if (scmNodeInfo.getScmClientAddress() == null) {
- throw new ConfigurationException("SCM Client Address could not " +
+ String protocolAddress = getProtocolAddress(scmNodeInfo);
+ if (protocolAddress == null) {
+ throw new ConfigurationException(protocolClass.getSimpleName() + " SCM
Address could not " +
"be obtained from config. Config is not properly defined");
} else {
- InetSocketAddress scmClientAddress =
- NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress());
+ InetSocketAddress protocolAddr =
NetUtils.createSocketAddr(protocolAddress);
- scmServiceId = scmNodeInfo.getServiceId();
+ String scmServiceId = scmNodeInfo.getServiceId();
String scmNodeId = scmNodeInfo.getNodeId();
-
scmNodeIds.add(scmNodeId);
- SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
- scmClientAddress);
+ SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
protocolAddr);
scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
}
}
}
- @VisibleForTesting
- public synchronized String getCurrentProxySCMNodeId() {
- return currentProxySCMNodeId;
- }
@VisibleForTesting
public synchronized void changeCurrentProxy(String nodeId) {
@@ -157,15 +175,15 @@ public synchronized void changeCurrentProxy(String
nodeId) {
}
@Override
- public synchronized ProxyInfo<StorageContainerLocationProtocolPB> getProxy()
{
- ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+ public synchronized ProxyInfo<T> getProxy() {
+ ProxyInfo<T> currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
if (currentProxyInfo == null) {
currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
}
return currentProxyInfo;
}
- public synchronized List<StorageContainerLocationProtocolPB> getProxies() {
+ public synchronized List<T> getProxies() {
for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) {
if (scmProxies.get(scmProxyInfo.getNodeId()) == null) {
scmProxies.put(scmProxyInfo.getNodeId(),
@@ -177,18 +195,17 @@ public synchronized
List<StorageContainerLocationProtocolPB> getProxies() {
}
@Override
- public synchronized void performFailover(
- StorageContainerLocationProtocolPB newLeader) {
+ public synchronized void performFailover(T newLeader) {
if (updatedLeaderNodeID != null) {
currentProxySCMNodeId = updatedLeaderNodeID;
} else {
nextProxyIndex();
}
- LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
+ getLogger().debug("Failing over to next proxy. {}",
getCurrentProxySCMNodeId());
}
public synchronized void performFailoverToAssignedLeader(String newLeader,
- Exception e) {
+ Exception e) {
ServerNotLeaderException snle =
(ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
if (snle != null && snle.getSuggestedLeader() != null) {
@@ -198,11 +215,11 @@ public synchronized void
performFailoverToAssignedLeader(String newLeader,
.equals(snle.getSuggestedLeader())).findFirst();
if (matchedProxyInfo.isPresent()) {
newLeader = matchedProxyInfo.get().getNodeId();
- LOG.debug("Performing failover to suggested leader {}, nodeId {}",
+ getLogger().debug("Performing failover to suggested leader {}, nodeId
{}",
snle.getSuggestedLeader(), newLeader);
} else {
- LOG.debug("Suggested leader {} does not match with any of the " +
- "proxyInfo adress {}", snle.getSuggestedLeader(),
+ getLogger().debug("Suggested leader {} does not match with any of the
" +
+ "proxyInfo address {}", snle.getSuggestedLeader(),
Arrays.toString(scmProxyInfoMap.values().toArray()));
}
}
@@ -210,16 +227,22 @@ public synchronized void
performFailoverToAssignedLeader(String newLeader,
}
@Override
- public Class<StorageContainerLocationProtocolPB> getInterface() {
- return StorageContainerLocationProtocolPB.class;
+ public Class<T> getInterface() {
+ return protocolClass;
+ }
+
+ public List<String> getSCMNodeIds() {
+ return Collections.unmodifiableList(scmNodeIds);
+ }
+
+ public Collection<SCMProxyInfo> getSCMProxyInfoList() {
+ return Collections.unmodifiableCollection(scmProxyInfoMap.values());
}
@Override
public synchronized void close() throws IOException {
- for (ProxyInfo<StorageContainerLocationProtocolPB>
- proxy : scmProxies.values()) {
- StorageContainerLocationProtocolPB scmProxy =
- proxy.proxy;
+ for (ProxyInfo<T> proxy : scmProxies.values()) {
+ T scmProxy = proxy.proxy;
if (scmProxy != null) {
RPC.stopProxy(scmProxy);
}
@@ -241,7 +264,7 @@ private synchronized void assignLeaderToNode(String
newLeaderNodeId) {
if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
updatedLeaderNodeID = newLeaderNodeId;
- LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+ getLogger().debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
} else {
updatedLeaderNodeID = null;
}
@@ -251,37 +274,33 @@ private synchronized void assignLeaderToNode(String
newLeaderNodeId) {
/**
* Creates proxy object.
*/
- private ProxyInfo createSCMProxy(String nodeId) {
- ProxyInfo proxyInfo;
+ private ProxyInfo<T> createSCMProxy(String nodeId) {
+ ProxyInfo<T> proxyInfo;
SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
InetSocketAddress address = scmProxyInfo.getAddress();
try {
- StorageContainerLocationProtocolPB scmProxy = createSCMProxy(address);
+ T scmProxy = createSCMProxy(address);
// Create proxyInfo here, to make it work with all Hadoop versions.
proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
scmProxies.put(nodeId, proxyInfo);
return proxyInfo;
} catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
+ getLogger().error("{} Failed to create RPC proxy to SCM at {}",
this.getClass().getSimpleName(), address, ioe);
throw new RuntimeException(ioe);
}
}
-
- private StorageContainerLocationProtocolPB createSCMProxy(
- InetSocketAddress scmAddress) throws IOException {
+ private T createSCMProxy(InetSocketAddress scmAddress) throws IOException {
Configuration hadoopConf =
LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class,
- ProtobufRpcEngine.class);
+ RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class);
// FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same OM in case of connection exception. This retry
+ // retries on the same SCM in case of connection exception. This retry
// policy essentially results in TRY_ONCE_THEN_FAIL.
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
+ RetryPolicy connectionRetryPolicy =
RetryPolicies.failoverOnNetworkException(0);
return RPC.getProtocolProxy(
- StorageContainerLocationProtocolPB.class,
+ protocolClass,
scmVersion, scmAddress, ugi,
hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
(int)scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
@@ -292,6 +311,18 @@ public RetryPolicy getRetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
+ if (getLogger().isDebugEnabled()) {
+ if (e.getCause() != null) {
+ getLogger().debug("RetryProxy: SCM Server {}: {}: {}",
+ getCurrentProxySCMNodeId(),
+ e.getCause().getClass().getSimpleName(),
+ e.getCause().getMessage());
+ } else {
+ getLogger().debug("RetryProxy: SCM {}: {}",
getCurrentProxySCMNodeId(),
+ e.getMessage());
+ }
+ }
+
if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
setUpdatedLeaderNodeID();
} else {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
index c7b50af67a..60e07b90b4 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -17,30 +17,9 @@
package org.apache.hadoop.hdds.scm.proxy;
-import com.google.common.base.Preconditions;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
-import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,260 +27,26 @@
/**
* Failover proxy provider for SCMSecurityProtocol server.
*/
-public class SCMSecurityProtocolFailoverProxyProvider implements
- FailoverProxyProvider<SCMSecurityProtocolPB>, Closeable {
+public class SCMSecurityProtocolFailoverProxyProvider extends
SCMFailoverProxyProviderBase<SCMSecurityProtocolPB> {
public static final Logger LOG =
LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class);
- // scmNodeId -> ProxyInfo<rpcProxy>
- private final Map<String,
- ProxyInfo<SCMSecurityProtocolPB>> scmProxies;
-
- // scmNodeId -> SCMProxyInfo
- private final Map<String, SCMProxyInfo> scmProxyInfoMap;
-
- private List<String> scmNodeIds;
-
- // As SCM Client is shared across threads, performFailOver()
- // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
- // updated in shouldRetry(). When 2 or more threads run in parallel, the
- // RetryInvocationHandler will check the expectedFailOverCount
- // and not execute performFailOver() for one of them. So the other thread(s)
- // shall not call performFailOver(), it will call getProxy() which uses
- // currentProxySCMNodeId and returns the proxy.
- private volatile String currentProxySCMNodeId;
- private volatile int currentProxyIndex;
-
-
- private final ConfigurationSource conf;
- private final SCMClientConfig scmClientConfig;
- private final long scmVersion;
-
- private String scmServiceId;
-
- private final int maxRetryCount;
- private final long retryInterval;
-
- private final UserGroupInformation ugi;
-
- private String updatedLeaderNodeID = null;
-
/**
* Construct fail-over proxy provider for SCMSecurityProtocol Server.
- * @param conf
- * @param userGroupInformation
*/
public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf,
UserGroupInformation userGroupInformation) {
- Preconditions.checkNotNull(userGroupInformation);
- this.ugi = userGroupInformation;
- this.conf = conf;
- this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class);
-
- this.scmProxies = new HashMap<>();
- this.scmProxyInfoMap = new HashMap<>();
- loadConfigs();
-
- this.currentProxyIndex = 0;
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- scmClientConfig = conf.getObject(SCMClientConfig.class);
- this.maxRetryCount = scmClientConfig.getRetryCount();
- this.retryInterval = scmClientConfig.getRetryInterval();
- }
-
- protected synchronized void loadConfigs() {
- List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
- scmNodeIds = new ArrayList<>();
-
- for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- if (scmNodeInfo.getScmSecurityAddress() == null) {
- throw new ConfigurationException("SCM Client Address could not " +
- "be obtained from config. Config is not properly defined");
- } else {
- InetSocketAddress scmSecurityAddress =
- NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
-
- scmServiceId = scmNodeInfo.getServiceId();
- String scmNodeId = scmNodeInfo.getNodeId();
-
- scmNodeIds.add(scmNodeId);
- SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
- scmSecurityAddress);
- scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
- }
- }
+ super(SCMSecurityProtocolPB.class, conf, userGroupInformation);
}
@Override
- public synchronized ProxyInfo<SCMSecurityProtocolPB> getProxy() {
- ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
- if (currentProxyInfo == null) {
- currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
- }
- return currentProxyInfo;
- }
-
- /**
- * Creates proxy object.
- */
- private ProxyInfo createSCMProxy(String nodeId) {
- ProxyInfo proxyInfo;
- SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
- InetSocketAddress address = scmProxyInfo.getAddress();
- try {
- SCMSecurityProtocolPB scmProxy = createSCMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
- scmProxies.put(nodeId, proxyInfo);
- return proxyInfo;
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
+ protected Logger getLogger() {
+ return LOG;
}
- private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress)
- throws IOException {
- Configuration hadoopConf =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class,
- ProtobufRpcEngine.class);
-
- // FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same SCM in case of connection exception. This retry
- // policy essentially results in TRY_ONCE_THEN_FAIL.
-
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
-
- return RPC.getProtocolProxy(SCMSecurityProtocolPB.class,
- scmVersion, scmAddress, ugi,
- hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
- (int)scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
- }
-
-
@Override
- public synchronized void performFailover(SCMSecurityProtocolPB currentProxy)
{
- if (updatedLeaderNodeID != null) {
- currentProxySCMNodeId = updatedLeaderNodeID;
- } else {
- nextProxyIndex();
- }
- LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
- }
-
- public synchronized void performFailoverToAssignedLeader(String newLeader,
- Exception e) {
- ServerNotLeaderException snle =
- (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
- if (snle != null && snle.getSuggestedLeader() != null) {
- Optional< SCMProxyInfo > matchedProxyInfo =
- scmProxyInfoMap.values().stream().filter(
- proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
- .equals(snle.getSuggestedLeader())).findFirst();
- if (matchedProxyInfo.isPresent()) {
- newLeader = matchedProxyInfo.get().getNodeId();
- LOG.debug("Performing failover to suggested leader {}, nodeId {}",
- snle.getSuggestedLeader(), newLeader);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Suggested leader {} does not match with any of the " +
- "proxyInfo adress {}", snle.getSuggestedLeader(),
- Arrays.toString(scmProxyInfoMap.values().toArray()));
- }
- }
- }
- assignLeaderToNode(newLeader);
- }
-
-
- private synchronized void assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
- if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
- updatedLeaderNodeID = newLeaderNodeId;
- LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
- } else {
- updatedLeaderNodeID = null;
- }
- }
- }
-
- /**
- * Update the proxy index to the next proxy in the list.
- * @return the new proxy index
- */
- private synchronized void nextProxyIndex() {
- // round robin the next proxy
- currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- }
-
- public RetryPolicy getRetryPolicy() {
- // Client will attempt up to maxFailovers number of failovers between
- // available SCMs before throwing exception.
- RetryPolicy retryPolicy = new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception exception, int retries,
- int failovers, boolean isIdempotentOrAtMostOnce)
- throws Exception {
-
- if (LOG.isDebugEnabled()) {
- if (exception.getCause() != null) {
- LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
- getCurrentProxySCMNodeId(),
- exception.getCause().getClass().getSimpleName(),
- exception.getCause().getMessage());
- } else {
- LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
- exception.getMessage());
- }
- }
-
- if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
- setUpdatedLeaderNodeID();
- } else {
- performFailoverToAssignedLeader(null, exception);
- }
- return SCMHAUtils
- .getRetryAction(failovers, retries, exception, maxRetryCount,
- getRetryInterval());
- }
- };
-
- return retryPolicy;
- }
-
- public synchronized void setUpdatedLeaderNodeID() {
- this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
- }
-
- @Override
- public Class< SCMSecurityProtocolPB > getInterface() {
- return SCMSecurityProtocolPB.class;
- }
-
- @Override
- public synchronized void close() throws IOException {
- for (ProxyInfo<SCMSecurityProtocolPB> proxyInfo : scmProxies.values()) {
- SCMSecurityProtocolPB proxy = proxyInfo.proxy;
- if (proxy != null) {
- RPC.stopProxy(proxy);
- }
- }
- }
-
- public synchronized String getCurrentProxySCMNodeId() {
- return currentProxySCMNodeId;
- }
-
- public synchronized int getCurrentProxyIndex() {
- return currentProxyIndex;
- }
-
- private long getRetryInterval() {
- return retryInterval;
+ protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) {
+ return scmNodeInfo.getScmSecurityAddress();
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
index 25f5b5524e..0359bb062c 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
@@ -17,30 +17,9 @@
package org.apache.hadoop.hdds.scm.proxy;
-import com.google.common.base.Preconditions;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
-import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,254 +28,26 @@
* Failover proxy provider for SCMSecretKeyProtocolService server.
*/
public class SecretKeyProtocolFailoverProxyProvider
- <T extends SCMSecretKeyProtocolService.BlockingInterface> implements
- FailoverProxyProvider<T>, Closeable {
+ <T extends SCMSecretKeyProtocolService.BlockingInterface> extends
SCMFailoverProxyProviderBase<T> {
public static final Logger LOG =
LoggerFactory.getLogger(SecretKeyProtocolFailoverProxyProvider.class);
- // scmNodeId -> ProxyInfo<rpcProxy>
- private final Map<String, ProxyInfo<T>> scmProxies;
-
- // scmNodeId -> SCMProxyInfo
- private final Map<String, SCMProxyInfo> scmProxyInfoMap;
-
- private List<String> scmNodeIds;
-
- // As SCM Client is shared across threads, performFailOver()
- // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
- // updated in shouldRetry(). When 2 or more threads run in parallel, the
- // RetryInvocationHandler will check the expectedFailOverCount
- // and not execute performFailOver() for one of them. So the other thread(s)
- // shall not call performFailOver(), it will call getProxy() which uses
- // currentProxySCMNodeId and returns the proxy.
- private volatile String currentProxySCMNodeId;
- private volatile int currentProxyIndex;
-
-
- private final ConfigurationSource conf;
- private final SCMClientConfig scmClientConfig;
- private final long scmVersion;
-
- private String scmServiceId;
-
- private final int maxRetryCount;
- private final long retryInterval;
-
- private final UserGroupInformation ugi;
- private final Class<T> proxyClazz;
-
- private String updatedLeaderNodeID = null;
-
/**
* Construct fail-over proxy provider for SCMSecurityProtocol Server.
- * @param conf
- * @param userGroupInformation
*/
public SecretKeyProtocolFailoverProxyProvider(ConfigurationSource conf,
UserGroupInformation userGroupInformation, Class<T> proxyClazz) {
- Preconditions.checkNotNull(userGroupInformation);
- this.ugi = userGroupInformation;
- this.conf = conf;
- this.proxyClazz = proxyClazz;
- this.scmVersion = RPC.getProtocolVersion(proxyClazz);
-
- this.scmProxies = new HashMap<>();
- this.scmProxyInfoMap = new HashMap<>();
- loadConfigs();
-
- this.currentProxyIndex = 0;
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- scmClientConfig = conf.getObject(SCMClientConfig.class);
- this.maxRetryCount = scmClientConfig.getRetryCount();
- this.retryInterval = scmClientConfig.getRetryInterval();
- }
-
- protected synchronized void loadConfigs() {
- List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
- scmNodeIds = new ArrayList<>();
-
- for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- if (scmNodeInfo.getScmSecurityAddress() == null) {
- throw new ConfigurationException("SCM Client Address could not " +
- "be obtained from config. Config is not properly defined");
- } else {
- InetSocketAddress scmSecurityAddress =
- NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
-
- scmServiceId = scmNodeInfo.getServiceId();
- String scmNodeId = scmNodeInfo.getNodeId();
-
- scmNodeIds.add(scmNodeId);
- SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
- scmSecurityAddress);
- scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
- }
- }
+ super(proxyClazz, conf, userGroupInformation);
}
@Override
- public synchronized ProxyInfo<T> getProxy() {
- ProxyInfo<T> currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
- if (currentProxyInfo == null) {
- currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
- }
- return currentProxyInfo;
- }
-
- /**
- * Creates proxy object.
- */
- private ProxyInfo<T> createSCMProxy(String nodeId) {
- ProxyInfo<T> proxyInfo;
- SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
- InetSocketAddress address = scmProxyInfo.getAddress();
- try {
- T scmProxy = createSCMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<T>(scmProxy, scmProxyInfo.toString());
- scmProxies.put(nodeId, proxyInfo);
- return proxyInfo;
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to SCM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
+ protected Logger getLogger() {
+ return LOG;
}
- private T createSCMProxy(InetSocketAddress scmAddress)
- throws IOException {
- Configuration hadoopConf =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, proxyClazz,
- ProtobufRpcEngine.class);
-
- // FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same SCM in case of connection exception. This retry
- // policy essentially results in TRY_ONCE_THEN_FAIL.
-
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
-
- return RPC.getProtocolProxy(proxyClazz,
- scmVersion, scmAddress, ugi,
- hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
- (int)scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
- }
-
-
@Override
- public synchronized void performFailover(T currentProxy) {
- if (updatedLeaderNodeID != null) {
- currentProxySCMNodeId = updatedLeaderNodeID;
- } else {
- nextProxyIndex();
- }
- LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
- }
-
- public synchronized void performFailoverToAssignedLeader(String newLeader,
- Exception e) {
- ServerNotLeaderException snle =
- (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
- if (snle != null && snle.getSuggestedLeader() != null) {
- Optional< SCMProxyInfo > matchedProxyInfo =
- scmProxyInfoMap.values().stream().filter(
- proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
- .equals(snle.getSuggestedLeader())).findFirst();
- if (matchedProxyInfo.isPresent()) {
- newLeader = matchedProxyInfo.get().getNodeId();
- LOG.debug("Performing failover to suggested leader {}, nodeId {}",
- snle.getSuggestedLeader(), newLeader);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Suggested leader {} does not match with any of the " +
- "proxyInfo adress {}", snle.getSuggestedLeader(),
- Arrays.toString(scmProxyInfoMap.values().toArray()));
- }
- }
- }
- assignLeaderToNode(newLeader);
- }
-
-
- private synchronized void assignLeaderToNode(String newLeaderNodeId) {
- if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
- if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
- updatedLeaderNodeID = newLeaderNodeId;
- LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
- } else {
- updatedLeaderNodeID = null;
- }
- }
- }
-
- /**
- * Update the proxy index to the next proxy in the list.
- * @return the new proxy index
- */
- private synchronized void nextProxyIndex() {
- // round robin the next proxy
- currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
- currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
- }
-
- public RetryPolicy getRetryPolicy() {
- // Client will attempt up to maxFailovers number of failovers between
- // available SCMs before throwing exception.
-
- return (exception, retries, failovers, isIdempotentOrAtMostOnce) -> {
-
- if (LOG.isDebugEnabled()) {
- if (exception.getCause() != null) {
- LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
- getCurrentProxySCMNodeId(),
- exception.getCause().getClass().getSimpleName(),
- exception.getCause().getMessage());
- } else {
- LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
- exception.getMessage());
- }
- }
-
- if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
- setUpdatedLeaderNodeID();
- } else {
- performFailoverToAssignedLeader(null, exception);
- }
- return SCMHAUtils
- .getRetryAction(failovers, retries, exception, maxRetryCount,
- getRetryInterval());
- };
- }
-
- public synchronized void setUpdatedLeaderNodeID() {
- this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
- }
-
- @Override
- public Class<T> getInterface() {
- return proxyClazz;
- }
-
- @Override
- public synchronized void close() throws IOException {
- for (ProxyInfo<T> proxyInfo : scmProxies.values()) {
- if (proxyInfo.proxy != null) {
- RPC.stopProxy(proxyInfo.proxy);
- }
- }
- }
-
- public synchronized String getCurrentProxySCMNodeId() {
- return currentProxySCMNodeId;
- }
-
- public synchronized int getCurrentProxyIndex() {
- return currentProxyIndex;
- }
-
- private long getRetryInterval() {
- return retryInterval;
+ protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) {
+ return scmNodeInfo.getScmSecurityAddress();
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
index c41e3627ea..2cfa051a93 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
@@ -20,6 +20,8 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Proxy provider for SCMSecretKeyProtocolService against a
@@ -28,6 +30,10 @@
public class SingleSecretKeyProtocolProxyProvider
<T extends SCMSecretKeyProtocolService.BlockingInterface>
extends SecretKeyProtocolFailoverProxyProvider<T> {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SingleSecretKeyProtocolProxyProvider.class);
+
private final String scmNodeId;
public SingleSecretKeyProtocolProxyProvider(
@@ -49,6 +55,11 @@ public synchronized void performFailover(T currentProxy) {
// do nothing.
}
+ @Override
+ protected Logger getLogger() {
+ return LOG;
+ }
+
@Override
public synchronized void performFailoverToAssignedLeader(String newLeader,
Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]