kerneltime commented on code in PR #3389:
URL: https://github.com/apache/ozone/pull/3389#discussion_r880437888
##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java:
##########
@@ -48,93 +54,105 @@
* connecting to another OM node from the list of proxies.
*/
public class GrpcOMFailoverProxyProvider<T> extends
- OMFailoverProxyProvider<T> {
-
- private Map<String, String> omAddresses;
+ OMFailoverProxyProviderBase<T> {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOMFailoverProxyProvider.class);
public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
- UserGroupInformation ugi,
String omServiceId,
Class<T> protocol) throws IOException {
- super(configuration, ugi, omServiceId, protocol);
+ super(configuration, omServiceId, protocol);
}
@Override
protected void loadOMClientConfigs(ConfigurationSource config, String
omSvcId)
throws IOException {
- // to be used for base class omProxies,
- // ProxyInfo not applicable for gRPC, just need key set
- Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
- // to be used for base class omProxyInfos
- // OMProxyInfo not applicable for gRPC, just need key set
- Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
- List<String> omNodeIDList = new ArrayList<>();
- omAddresses = new HashMap<>();
Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+ Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+ List<String> omNodeIDList = new ArrayList<>();
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
omSvcId, nodeId);
-
Optional<String> hostaddr = getHostNameFromConfigKeys(config,
rpcAddrKey);
-
OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
omSvcId, nodeId),
OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
if (nodeId == null) {
nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
- omProxiesNodeIdKeyset.put(nodeId, null);
- omProxyInfosNodeIdKeyset.put(nodeId, null);
if (hostaddr.isPresent()) {
- omAddresses.put(nodeId,
- hostaddr.get() + ":"
- + hostport.orElse(config
- .getObject(GrpcOmTransport
- .GrpcOmTransportConfig.class)
- .getPort()));
+ ProxyInfo<T> proxyInfo =
+ new ProxyInfo<>(createOMProxy(),
+ hostaddr.get() + ":"
+ + hostport.orElse(config
+ .getObject(GrpcOmTransport
+ .GrpcOmTransportConfig.class)
+ .getPort()));
+ omProxies.put(nodeId, proxyInfo);
} else {
LOG.error("expected host address not defined for: {}", rpcAddrKey);
throw new ConfigurationException(rpcAddrKey + "is not defined");
}
omNodeIDList.add(nodeId);
}
- if (omProxiesNodeIdKeyset.isEmpty()) {
+ if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
+ setOmProxies(omProxies);
+ setOmNodeIDList(omNodeIDList);
+ }
- // set base class omProxies, omProxyInfos, omNodeIDList
+ private T createOMProxy() throws IOException {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ Configuration hadoopConf =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(getConf());
+ return (T) RPC.getProxy(getInterface(), 0, addr, hadoopConf);
+ }
- // omProxies needed in base class
- // omProxies.size == number of om nodes
- // omProxies key needs to be valid nodeid
- // omProxyInfos keyset needed in base class
- setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs. RPC proxy object is intialized lazily.
+ * @return the OM proxy object to invoke methods upon
+ */
+ @Override
+ public synchronized ProxyInfo<T> getProxy() {
+ return getOMProxyMap().get(getCurrentProxyOMNodeId());
}
@Override
- protected Text computeDelegationTokenService() {
- return new Text();
+ protected synchronized boolean shouldFailover(Exception ex) {
+ if (ex instanceof StatusRuntimeException) {
+ StatusRuntimeException srexp = (StatusRuntimeException)ex;
+ Status status = srexp.getStatus();
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
Review Comment:
is this the only exception for which we should not failover? I see
`FAILED_PRECONDITION`, `INVALID_ARGUMENT` and a few others in the list of
possible codes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]