This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4440-s3-performance by
this push:
new 413d4aa HDDS-5544. Update GRPC OmTransport implementation for HA
(#2901)
413d4aa is described below
commit 413d4aade25e77b444fcbbea36a3302cd2a5dc66
Author: Neil Joshi <[email protected]>
AuthorDate: Wed Mar 9 12:19:01 2022 -0700
HDDS-5544. Update GRPC OmTransport implementation for HA (#2901)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 20 +++
.../java/org/apache/hadoop/hdds/TestHddsUtils.java | 39 +++-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 +-
.../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 143 +++++++++++++++
.../ozone/om/ha/OMFailoverProxyProvider.java | 22 +--
.../ozone/om/protocolPB/GrpcOmTransport.java | 196 +++++++++++++++++----
.../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 119 +++++++++++--
.../src/main/compose/ozone-om-ha/docker-config | 1 +
.../src/main/compose/ozonesecure-ha/docker-config | 1 +
.../dist/src/main/compose/ozonesecure-ha/test.sh | 2 +-
.../hadoop/ozone/TestOzoneConfigurationFields.java | 3 +-
.../hadoop/ozone/om/GrpcOzoneManagerServer.java | 20 ++-
.../hadoop/ozone/om/OzoneManagerServiceGrpc.java | 43 +----
.../hadoop/ozone/om/failover/TestOMFailovers.java | 2 +-
14 files changed, 516 insertions(+), 98 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ffbb3e3..364377d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -228,6 +228,26 @@ public final class HddsUtils {
}
/**
+ * Retrieve a number, trying the supplied config keys in order.
+ * Each config value may be absent
+ *
+ * @param conf Conf
+ * @param keys a list of configuration key names.
+ *
+ * @return first number found from the given keys, or absent.
+ */
+ public static OptionalInt getNumberFromConfigKeys(
+ ConfigurationSource conf, String... keys) {
+ for (final String key : keys) {
+ final String value = conf.getTrimmed(key);
+ if (value != null) {
+ return OptionalInt.of(Integer.parseInt(value));
+ }
+ }
+ return OptionalInt.empty();
+ }
+
+ /**
* Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index fd8aa28..6700101 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -36,6 +36,8 @@ import static
org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
+
import static org.hamcrest.core.Is.is;
import org.junit.Assert;
import static org.junit.Assert.assertThat;
@@ -216,4 +218,39 @@ public class TestHddsUtils {
}
-}
\ No newline at end of file
+ @Test
+ public void testGetNumberFromConfigKeys() {
+ final String testnum1 = "8";
+ final String testnum2 = "7";
+ final String serviceId = "id1";
+ final String nodeId = "scm1";
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
+ testnum1);
+ Assert.assertTrue(Integer.parseInt(testnum1) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+ /* Test to return first unempty key number from list */
+ /* first key is absent */
+ Assert.assertTrue(Integer.parseInt(testnum1) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+ /* now set the empty key and ensure returned value from this key */
+ conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ testnum2);
+ Assert.assertTrue(Integer.parseInt(testnum2) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index cdd9e52..6ebd7e1 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -57,7 +57,8 @@ public final class OMConfigKeys {
public static final String OZONE_OM_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final int OZONE_OM_PORT_DEFAULT = 9862;
-
+ public static final String OZONE_OM_GRPC_PORT_KEY =
+ "ozone.om.grpc.port";
public static final String OZONE_OM_HTTP_ENABLED_KEY =
"ozone.om.http.enabled";
public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000..498f935
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * The Grpc s3gateway om transport failover proxy provider implementation
+ * extending the ozone client OM failover proxy provider. This implmentation
+ * allows the Grpc OMTransport reuse OM failover retry policies and
+ * getRetryAction methods. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class GrpcOMFailoverProxyProvider<T> extends
+ OMFailoverProxyProvider<T> {
+
+ private Map<String, String> omAddresses;
+
+ public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
+ UserGroupInformation ugi,
+ String omServiceId,
+ Class<T> protocol) throws IOException {
+ super(configuration, ugi, omServiceId, protocol);
+ }
+
+ @Override
+ protected void loadOMClientConfigs(ConfigurationSource config, String
omSvcId)
+ throws IOException {
+ // to be used for base class omProxies,
+ // ProxyInfo not applicable for gRPC, just need key set
+ Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
+ // to be used for base class omProxyInfos
+ // OMProxyInfo not applicable for gRPC, just need key set
+ Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
+ List<String> omNodeIDList = new ArrayList<>();
+ omAddresses = new HashMap<>();
+
+ Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omSvcId, nodeId);
+
+ Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+ rpcAddrKey);
+
+ OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
+ ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+ omSvcId, nodeId),
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+ }
+ omProxiesNodeIdKeyset.put(nodeId, null);
+ omProxyInfosNodeIdKeyset.put(nodeId, null);
+ if (hostaddr.isPresent()) {
+ omAddresses.put(nodeId,
+ hostaddr.get() + ":"
+ + hostport.orElse(config
+ .getObject(GrpcOmTransport
+ .GrpcOmTransportConfig.class)
+ .getPort()));
+ } else {
+ LOG.error("expected host address not defined for: {}", rpcAddrKey);
+ throw new ConfigurationException(rpcAddrKey + "is not defined");
+ }
+ omNodeIDList.add(nodeId);
+ }
+
+ if (omProxiesNodeIdKeyset.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+
+ // set base class omProxies, omProxyInfos, omNodeIDList
+
+ // omProxies needed in base class
+ // omProxies.size == number of om nodes
+ // omProxies key needs to be valid nodeid
+ // omProxyInfos keyset needed in base class
+ setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+ }
+
+ @Override
+ protected Text computeDelegationTokenService() {
+ return new Text();
+ }
+
+ // need to throw if nodeID not in omAddresses
+ public String getGrpcProxyAddress(String nodeId) throws IOException {
+ if (omAddresses.containsKey(nodeId)) {
+ return omAddresses.get(nodeId);
+ } else {
+ LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+ nodeId);
+ throw new IOException(
+ "expected nodeId not found in omAddresses for proxyhost");
+ }
+
+ }
+
+ public List<String> getGrpcOmNodeIDList() {
+ return getOmNodeIDList();
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5432468..9fb690e 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -148,8 +148,6 @@ public class OMFailoverProxyProvider<T> implements
rpcAddrStr);
if (omProxyInfo.getAddress() != null) {
-
-
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// the default value
if (nodeId == null) {
@@ -551,14 +549,18 @@ public class OMFailoverProxyProvider<T> implements
return null;
}
- @VisibleForTesting
- protected void setProxiesForTesting(
- Map<String, ProxyInfo<T>> testOMProxies,
- Map<String, OMProxyInfo> testOMProxyInfos,
- List<String> testOMNodeIDList) {
- this.omProxies = testOMProxies;
- this.omProxyInfos = testOMProxyInfos;
- this.omNodeIDList = testOMNodeIDList;
+ protected void setProxies(
+ Map<String, ProxyInfo<T>> setOMProxies,
+ Map<String, OMProxyInfo> setOMProxyInfos,
+ List<String> setOMNodeIDList) {
+ this.omProxies = setOMProxies;
+ this.omProxyInfos = setOMProxyInfos;
+ this.omNodeIDList = setOMNodeIDList;
}
+
+ protected List<String> getOmNodeIDList() {
+ return omNodeIDList;
+ }
+
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index 3607429..72c29f0 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -18,22 +18,34 @@
package org.apache.hadoop.ozone.om.protocolPB;
import java.io.IOException;
-import java.util.Optional;
+import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import com.google.common.net.HostAndPort;
import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import org.apache.hadoop.ipc.RemoteException;
+
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
@@ -42,12 +54,10 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
/**
* Grpc transport for grpc between s3g and om.
@@ -60,60 +70,171 @@ public class GrpcOmTransport implements OmTransport {
private final AtomicBoolean isRunning = new AtomicBoolean(false);
// gRPC specific
- private ManagedChannel channel;
-
private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+ private Map<String,
+ OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
+ private Map<String, ManagedChannel> channels;
+ private int lastVisited = -1;
+ private ConfigurationSource conf;
- private String host = "om";
- private int port = 8981;
+ //private String host = "om";
+ private AtomicReference<String> host;
private int maxSize;
+ private List<String> oms;
+ private RetryPolicy retryPolicy;
+ private int failoverCount = 0;
+ private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+ omFailoverProxyProvider;
+
public GrpcOmTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId)
throws IOException {
- Optional<String> omHost = getHostNameFromConfigKeys(conf,
- OZONE_OM_ADDRESS_KEY);
- this.host = omHost.orElse("0.0.0.0");
- port = conf.getObject(GrpcOmTransportConfig.class).getPort();
+ this.channels = new HashMap<>();
+ this.clients = new HashMap<>();
+ this.conf = conf;
+ this.host = new AtomicReference();
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+ omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+ conf,
+ ugi,
+ omServiceId,
+ OzoneManagerProtocolPB.class);
+
start();
}
- public void start() {
+ public void start() throws IOException {
+ host.set(omFailoverProxyProvider
+ .getGrpcProxyAddress(
+ omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+
if (!isRunning.compareAndSet(false, true)) {
LOG.info("Ignore. already started.");
return;
}
- NettyChannelBuilder channelBuilder =
- NettyChannelBuilder.forAddress(host, port)
- .usePlaintext()
- .maxInboundMessageSize(maxSize);
- channel = channelBuilder.build();
- client = OzoneManagerServiceGrpc.newBlockingStub(channel);
+ List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+ for (String nodeId : nodes) {
+ String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+ HostAndPort hp = HostAndPort.fromString(hostaddr);
+
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
+ .usePlaintext()
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+ channels.put(hostaddr, channelBuilder.build());
+ clients.put(hostaddr,
+ OzoneManagerServiceGrpc
+ .newBlockingStub(channels.get(hostaddr)));
+ }
+ int maxFailovers = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+
+ retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
LOG.info("{}: started", CLIENT_NAME);
}
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
OMResponse resp = null;
- try {
- resp = client.submitRequest(payload);
- } catch (io.grpc.StatusRuntimeException e) {
- ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
- if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
- resultCode = ResultCodes.TIMEOUT;
+ boolean tryOtherHost = true;
+ ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+ while (tryOtherHost) {
+ tryOtherHost = false;
+ try {
+ resp = clients.get(host.get()).submitRequest(payload);
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+ resultCode = ResultCodes.TIMEOUT;
+ }
+ Exception exp = new Exception(e);
+ tryOtherHost = shouldRetry(unwrapException(exp));
+ if (!tryOtherHost) {
+ throw new OMException(resultCode);
+ }
}
- throw new OMException(e.getCause(), resultCode);
}
return resp;
}
+ private Exception unwrapException(Exception ex) {
+ Exception grpcException = null;
+ try {
+ StatusRuntimeException srexp =
+ (StatusRuntimeException)ex.getCause();
+ Status status = srexp.getStatus();
+ LOG.debug("GRPC exception wrapped: {}", status.getDescription());
+ if (status.getCode() == Status.Code.INTERNAL) {
+ // exception potentially generated by OzoneManagerServiceGrpc
+ Class<?> realClass = Class.forName(status.getDescription()
+ .substring(0, status.getDescription()
+ .indexOf(":")));
+ Class<? extends Exception> cls = realClass
+ .asSubclass(Exception.class);
+ Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+ cn.setAccessible(true);
+ grpcException = cn.newInstance(status.getDescription());
+ IOException remote = null;
+ try {
+ String cause = status.getDescription();
+ cause = cause.substring(cause.indexOf(":") + 2);
+ remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
+ cause.substring(cause.indexOf(":") + 1));
+ grpcException.initCause(remote);
+ } catch (Exception e) {
+ LOG.error("cannot get cause for remote exception");
+ }
+ } else {
+ // exception generated by connection failure, gRPC
+ grpcException = ex;
+ }
+ } catch (Exception e) {
+ grpcException = new IOException(e);
+ LOG.error("error unwrapping exception from OMResponse {}");
+ }
+ return grpcException;
+ }
+
+ private boolean shouldRetry(Exception ex) {
+ boolean retry = false;
+ RetryPolicy.RetryAction action = null;
+ try {
+ action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++,
true);
+ LOG.debug("grpc failover retry action {}", action.action);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ retry = false;
+ LOG.error("Retry request failed. " + action.reason, ex);
+ } else {
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
+ (action.action == RetryPolicy.RetryAction.RetryDecision
+ .FAILOVER_AND_RETRY)) {
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (Exception e) {
+ LOG.error("Error trying sleep thread for {}",
action.delayMillis);
+ }
+ }
+ // switch om host to current proxy OMNodeId
+ host.set(omFailoverProxyProvider
+ .getGrpcProxyAddress(
+ omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+ retry = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed failover exception {}", e);
+ }
+ return retry;
+ }
+
// stub implementation for interface
@Override
public Text getDelegationTokenService() {
@@ -121,11 +242,15 @@ public class GrpcOmTransport implements OmTransport {
}
public void shutdown() {
- channel.shutdown();
- try {
- channel.awaitTermination(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e);
+ for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) {
+ ManagedChannel channel = entry.getValue();
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}",
+ entry.getKey(), e);
+ }
}
}
@@ -156,9 +281,16 @@ public class GrpcOmTransport implements OmTransport {
}
@VisibleForTesting
- public void startClient(ManagedChannel testChannel) {
- client = OzoneManagerServiceGrpc.newBlockingStub(testChannel);
+ public void startClient(ManagedChannel testChannel) throws IOException {
+ List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+ for (String nodeId : nodes) {
+ String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+ clients.put(hostaddr,
+ OzoneManagerServiceGrpc
+ .newBlockingStub(testChannel));
+ }
LOG.info("{}: started", CLIENT_NAME);
}
+
}
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index 323bb0e..b427db5 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -25,25 +25,29 @@ import static org.mockito.Mockito.mock;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.ManagedChannel;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
import org.apache.hadoop.security.UserGroupInformation;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
-import io.grpc.ManagedChannel;
+import com.google.protobuf.ServiceException;
+import org.apache.ratis.protocol.RaftPeerId;
-import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.junit.Assert.fail;
/**
* Tests for GrpcOmTransport client.
@@ -59,11 +63,32 @@ public class TestS3GrpcOmTransport {
private final OMResponse omResponse = OMResponse.newBuilder()
.setSuccess(true)
- .setStatus(Status.OK)
+ .setStatus(org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK)
.setLeaderOMNodeId(leaderOMNodeId)
.setCmdType(Type.AllocateBlock)
.build();
+ private boolean doFailover = false;
+
+ private OzoneConfiguration conf;
+
+ private String omServiceId;
+ private UserGroupInformation ugi;
+ private ManagedChannel channel;
+
+
+ private ServiceException createNotLeaderException() {
+ RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
+
+ // TODO: Set suggest leaderID. Right now, client is not using suggest
+ // leaderID. Need to fix this.
+ OMNotLeaderException notLeaderException =
+ new OMNotLeaderException(raftPeerId);
+ LOG.debug(notLeaderException.getMessage());
+ return new ServiceException(notLeaderException);
+ }
+
private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
serviceImpl =
mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
@@ -78,10 +103,22 @@ public class TestS3GrpcOmTransport {
.OzoneManagerProtocolProtos
.OMResponse>
responseObserver) {
- responseObserver.onNext(omResponse);
- responseObserver.onCompleted();
+ try {
+ if (doFailover) {
+ doFailover = false;
+ throw createNotLeaderException();
+ } else {
+ responseObserver.onNext(omResponse);
+ responseObserver.onCompleted();
+ }
+ } catch (Throwable e) {
+ IOException ex = new IOException(e.getCause());
+ responseObserver.onError(io.grpc.Status
+ .INTERNAL
+ .withDescription(ex.getMessage())
+ .asRuntimeException());
+ }
}
-
}));
private GrpcOmTransport client;
@@ -101,18 +138,37 @@ public class TestS3GrpcOmTransport {
.start());
// Create a client channel and register for automatic graceful shutdown.
- ManagedChannel channel = grpcCleanup.register(
+ channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
- String omServiceId = "";
- OzoneConfiguration conf = new OzoneConfiguration();
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ omServiceId = "";
+ conf = new OzoneConfiguration();
+ ugi = UserGroupInformation.getCurrentUser();
+ doFailover = false;
+ }
+
+ @Test
+ public void testSubmitRequestToServer() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
client = new GrpcOmTransport(conf, ugi, omServiceId);
client.startClient(channel);
+
+ final OMResponse resp = client.submitRequest(omRequest);
+ Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK);
+ Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
}
@Test
- public void testSubmitRequestToServer() throws Exception {
+ public void testGrpcFailoverProxy() throws Exception {
ServiceListRequest req = ServiceListRequest.newBuilder().build();
final OMRequest omRequest = OMRequest.newBuilder()
@@ -122,8 +178,45 @@ public class TestS3GrpcOmTransport {
.setServiceListRequest(req)
.build();
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ doFailover = true;
+ // first invocation generates a NotALeaderException
+ // failover is performed and request is internally retried
+ // second invocation request to server succeeds
final OMResponse resp = client.submitRequest(omRequest);
- Assert.assertEquals(resp.getStatus(), OK);
+ Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK);
Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
}
+
+ @Test
+ public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0);
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ doFailover = true;
+ // first invocation generates a NotALeaderException
+ // failover is performed and request is internally retried
+ // OMFailoverProvider returns Fail retry due to #attempts >
+ // max failovers
+
+ try {
+ final OMResponse resp = client.submitRequest(omRequest);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
}
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 69f4e52..4642680 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -36,6 +36,7 @@ OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
ASYNC_PROFILER_HOME=/opt/profiler
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 498d02e..be93d0a 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -51,6 +51,7 @@ OZONE-SITE.XML_hdds.grpc.tls.enabled=true
OZONE-SITE.XML_ozone.replication=3
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
index 7410822..252f953 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
@@ -35,7 +35,7 @@ execute_robot_test ${SCM} freon
execute_robot_test ${SCM} basic/links.robot
-#execute_robot_test ${SCM} s3
+execute_robot_test ${SCM} s3
execute_robot_test ${SCM} admincli
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 3269c39..1c772cf 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -111,7 +111,8 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
OMConfigKeys.OZONE_OM_HA_PREFIX,
- OMConfigKeys.OZONE_OM_TRANSPORT_CLASS
+ OMConfigKeys.OZONE_OM_TRANSPORT_CLASS,
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY
// TODO HDDS-2856
));
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
index 60942f9..7fe338c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.ozone.om;
import java.io.IOException;
+import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
import io.grpc.Server;
@@ -47,9 +50,20 @@ public class GrpcOzoneManagerServer {
omTranslator,
OzoneDelegationTokenSecretManager
delegationTokenMgr) {
- this.port = config.getObject(
- GrpcOzoneManagerServerConfig.class).
- getPort();
+ OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config,
+ ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+ config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY),
+ config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)),
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+ if (haPort.isPresent()) {
+ this.port = haPort.getAsInt();
+ } else {
+ this.port = config.getObject(
+ GrpcOzoneManagerServerConfig.class).
+ getPort();
+ }
+
init(omTranslator,
delegationTokenMgr,
config);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
index de11608..a88e259 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.ozone.om;
+import io.grpc.Status;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto
@@ -68,7 +68,6 @@ public class OzoneManagerServiceGrpc extends
OzoneManagerServiceImplBase {
"processing s3g client submit request - for command {}",
request.getCmdType().name());
AtomicInteger callCount = new AtomicInteger(0);
- OMResponse omResponse = null;
org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
callCount.incrementAndGet(),
@@ -84,42 +83,16 @@ public class OzoneManagerServiceGrpc extends
OzoneManagerServiceImplBase {
// for OMRequests. Test through successful ratis-enabled OMRequest
// handling without dependency on hadoop IPC based Server.
try {
- omResponse = this.omTranslator.
+ OMResponse omResponse = this.omTranslator.
submitRequest(NULL_RPC_CONTROLLER, request);
+ responseObserver.onNext(omResponse);
} catch (Throwable e) {
- IOException ioe = null;
- Throwable se = e.getCause();
- if (se == null) {
- ioe = new IOException(e);
- } else {
- ioe = se instanceof IOException ?
- (IOException) se : new IOException(e);
- }
- omResponse = createErrorResponse(
- request,
- ioe);
+ IOException ex = new IOException(e.getCause());
+ responseObserver.onError(Status
+ .INTERNAL
+ .withDescription(ex.getMessage())
+ .asRuntimeException());
}
- responseObserver.onNext(omResponse);
responseObserver.onCompleted();
}
-
- /**
- * Create OMResponse from the specified OMRequest and exception.
- *
- * @param omRequest
- * @param exception
- * @return OMResponse
- */
- private OMResponse createErrorResponse(
- OMRequest omRequest, IOException exception) {
- OMResponse.Builder omResponse = OMResponse.newBuilder()
- .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
- .setCmdType(omRequest.getCmdType())
- .setTraceID(omRequest.getTraceID())
- .setSuccess(false);
- if (exception.getMessage() != null) {
- omResponse.setMessage(exception.getMessage());
- }
- return omResponse.build();
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 0160166..fe7f6f4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -143,7 +143,7 @@ public class TestOMFailovers {
omProxyInfos.put(nodeId, null);
omNodeIDList.add(nodeId);
}
- setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
+ setProxies(omProxies, omProxyInfos, omNodeIDList);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]