neils-dev commented on a change in pull request #2901:
URL: https://github.com/apache/ozone/pull/2901#discussion_r807401760
##########
File path:
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
##########
@@ -60,72 +69,185 @@
private final AtomicBoolean isRunning = new AtomicBoolean(false);
// gRPC specific
- private ManagedChannel channel;
-
private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+ private Map<String,
+ OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
+ private Map<String, ManagedChannel> channels;
+ private int lastVisited = -1;
+ private ConfigurationSource conf;
private String host = "om";
- private int port = 8981;
private int maxSize;
+ private List<String> oms;
+ private RetryPolicy retryPolicy;
+ private int failoverCount = 0;
+ private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+ omFailoverProxyProvider;
+
public GrpcOmTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId)
throws IOException {
- Optional<String> omHost = getHostNameFromConfigKeys(conf,
- OZONE_OM_ADDRESS_KEY);
- this.host = omHost.orElse("0.0.0.0");
- port = conf.getObject(GrpcOmTransportConfig.class).getPort();
+ this.channels = new HashMap<>();
+ this.clients = new HashMap<>();
+ this.conf = conf;
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+ omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+ conf,
+ ugi,
+ omServiceId,
+ OzoneManagerProtocolPB.class);
+
start();
}
- public void start() {
+ public void start() throws IOException {
+ host = omFailoverProxyProvider
+ .getGrpcProxyAddress(
+ omFailoverProxyProvider.getCurrentProxyOMNodeId());
+
if (!isRunning.compareAndSet(false, true)) {
LOG.info("Ignore. already started.");
return;
}
- NettyChannelBuilder channelBuilder =
- NettyChannelBuilder.forAddress(host, port)
- .usePlaintext()
- .maxInboundMessageSize(maxSize);
- channel = channelBuilder.build();
- client = OzoneManagerServiceGrpc.newBlockingStub(channel);
+ List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+ for (String nodeId : nodes) {
+ String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+ HostAndPort hp = HostAndPort.fromString(hostaddr);
+
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
+ .usePlaintext()
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+ channels.put(hostaddr, channelBuilder.build());
+ clients.put(hostaddr,
+ OzoneManagerServiceGrpc
+ .newBlockingStub(channels.get(hostaddr)));
+ }
+ int maxFailovers = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+
+ retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
LOG.info("{}: started", CLIENT_NAME);
}
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
OMResponse resp = null;
- try {
- resp = client.submitRequest(payload);
- } catch (io.grpc.StatusRuntimeException e) {
- ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
- if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
- resultCode = ResultCodes.TIMEOUT;
+ boolean tryOtherHost = true;
+ ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+ while (tryOtherHost) {
+ tryOtherHost = false;
+ try {
+ resp = clients.get(host).submitRequest(payload);
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+ resultCode = ResultCodes.TIMEOUT;
+ }
+ Exception exp = new Exception(e);
+ tryOtherHost = shouldRetry(unwrapException(exp));
+ if (!tryOtherHost) {
+ throw new OMException(resultCode);
+ }
}
- throw new OMException(e.getCause(), resultCode);
}
return resp;
}
+ private Exception unwrapException(Exception ex) {
+ Exception grpcException = null;
+ try {
+ StatusRuntimeException srexp =
+ (StatusRuntimeException)ex.getCause();
+ Status status = srexp.getStatus();
+ LOG.debug("GRPC exception wrapped: {}", status.getDescription());
+ if (status.getCode() == Status.Code.INTERNAL) {
+ // exception potentially generated by OzoneManagerServiceGrpc
+ Class<?> realClass = Class.forName(status.getDescription()
+ .substring(0, status.getDescription()
+ .indexOf(":")));
+ Class<? extends Exception> cls = realClass
+ .asSubclass(Exception.class);
+ Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+ cn.setAccessible(true);
+ grpcException = cn.newInstance(status.getDescription());
+ IOException remote = null;
+ try {
+ String cause = status.getDescription();
+ cause = cause.substring(cause.indexOf(":") + 2);
+ remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
+ cause.substring(cause.indexOf(":") + 1));
+ grpcException.initCause(remote);
+ } catch (Exception e) {
+ LOG.error("cannot get cause for remote exception");
+ }
+ } else {
+ // exception generated by connection failure, gRPC
+ grpcException = ex;
+ }
+ } catch (Exception e) {
+ grpcException = new IOException(e);
+ LOG.error("error unwrapping exception from OMResponse {}");
+ }
+ return grpcException;
+ }
+
+ private boolean shouldRetry(Exception ex) {
+ boolean retry = false;
+ RetryPolicy.RetryAction action = null;
+ try {
+ action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++,
true);
+ LOG.debug("grpc failover retry action {}", action.action);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ retry = false;
+ LOG.error("Retry request failed. " + action.reason, ex);
+ } else {
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
+ (action.action == RetryPolicy.RetryAction.RetryDecision
+ .FAILOVER_AND_RETRY)) {
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (Exception e) {
+ LOG.error("Error trying sleep thread for {}",
action.delayMillis);
+ }
+ }
+ // switch om host to current proxy OMNodeId
+ host = omFailoverProxyProvider
Review comment:
Yes concurrency is present for the `submitRequest`. Though the gRPC
channel is blocking, the `submitRequest` handles concurrent accesses. The
concern here is with the failover provider and state information. As the
`GrpcOmTransport` uses the `OmFailoverProxyProvider` policy and retry code,
most of the methods for retry are thread safe.
I am adding the synchronized java construct to the `shouldRetry` method of
the GrpcOmTransport out of precaution. Any suggestions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]