This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch HDDS-4440-s3-performance in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d57616e165a6500e626d82159fefb95907db808d Author: Doroszlai, Attila <[email protected]> AuthorDate: Mon Apr 11 19:12:00 2022 +0200 Revert "HDDS-5544. Update GRPC OmTransport implementation for HA (#2901)" This reverts commit 413d4aade25e77b444fcbbea36a3302cd2a5dc66. --- .../java/org/apache/hadoop/hdds/HddsUtils.java | 20 --- .../java/org/apache/hadoop/hdds/TestHddsUtils.java | 39 +--- .../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 +- .../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 143 --------------- .../ozone/om/ha/OMFailoverProxyProvider.java | 22 ++- .../ozone/om/protocolPB/GrpcOmTransport.java | 196 ++++----------------- .../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 119 ++----------- .../src/main/compose/ozone-om-ha/docker-config | 1 - .../src/main/compose/ozonesecure-ha/docker-config | 1 - .../dist/src/main/compose/ozonesecure-ha/test.sh | 2 +- .../hadoop/ozone/TestOzoneConfigurationFields.java | 3 +- .../hadoop/ozone/om/GrpcOzoneManagerServer.java | 20 +-- .../hadoop/ozone/om/OzoneManagerServiceGrpc.java | 43 ++++- .../hadoop/ozone/om/failover/TestOMFailovers.java | 2 +- 14 files changed, 98 insertions(+), 516 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 364377d396..ffbb3e3340 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -227,26 +227,6 @@ public final class HddsUtils { } } - /** - * Retrieve a number, trying the supplied config keys in order. - * Each config value may be absent - * - * @param conf Conf - * @param keys a list of configuration key names. - * - * @return first number found from the given keys, or absent. - */ - public static OptionalInt getNumberFromConfigKeys( - ConfigurationSource conf, String... keys) { - for (final String key : keys) { - final String value = conf.getTrimmed(key); - if (value != null) { - return OptionalInt.of(Integer.parseInt(value)); - } - } - return OptionalInt.empty(); - } - /** * Retrieve the port number, trying the supplied config keys in order. * Each config value may be absent, or if present in the format diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java index 67001010d5..fd8aa28e63 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java @@ -36,8 +36,6 @@ import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT; - import static org.hamcrest.core.Is.is; import org.junit.Assert; import static org.junit.Assert.assertThat; @@ -218,39 +216,4 @@ public class TestHddsUtils { } - @Test - public void testGetNumberFromConfigKeys() { - final String testnum1 = "8"; - final String testnum2 = "7"; - final String serviceId = "id1"; - final String nodeId = "scm1"; - - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, - testnum1); - Assert.assertTrue(Integer.parseInt(testnum1) == - HddsUtils.getNumberFromConfigKeys( - conf, - OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); - - /* Test to return first unempty key number from list */ - /* first key is absent */ - Assert.assertTrue(Integer.parseInt(testnum1) == - HddsUtils.getNumberFromConfigKeys( - conf, - ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, - serviceId, nodeId), - OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); - - /* now set the empty key and ensure returned value from this key */ - conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, - serviceId, nodeId), - testnum2); - Assert.assertTrue(Integer.parseInt(testnum2) == - HddsUtils.getNumberFromConfigKeys( - conf, - ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, - serviceId, nodeId), - OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); - } -} +} \ No newline at end of file diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 6ebd7e11ad..cdd9e52667 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -57,8 +57,7 @@ public final class OMConfigKeys { public static final String OZONE_OM_BIND_HOST_DEFAULT = "0.0.0.0"; public static final int OZONE_OM_PORT_DEFAULT = 9862; - public static final String OZONE_OM_GRPC_PORT_KEY = - "ozone.om.grpc.port"; + public static final String OZONE_OM_HTTP_ENABLED_KEY = "ozone.om.http.enabled"; public static final String OZONE_OM_HTTP_BIND_HOST_KEY = diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java deleted file mode 100644 index 498f935974..0000000000 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.om.ha; - -import org.apache.hadoop.hdds.conf.ConfigurationException; -import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ozone.OmUtils; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.ha.ConfUtils; -import org.apache.hadoop.ozone.om.OMConfigKeys; -import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport; -import org.apache.hadoop.security.UserGroupInformation; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; - -import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; - -/** - * The Grpc s3gateway om transport failover proxy provider implementation - * extending the ozone client OM failover proxy provider. This implmentation - * allows the Grpc OMTransport reuse OM failover retry policies and - * getRetryAction methods. In case of OM failover, client can try - * connecting to another OM node from the list of proxies. - */ -public class GrpcOMFailoverProxyProvider<T> extends - OMFailoverProxyProvider<T> { - - private Map<String, String> omAddresses; - - public GrpcOMFailoverProxyProvider(ConfigurationSource configuration, - UserGroupInformation ugi, - String omServiceId, - Class<T> protocol) throws IOException { - super(configuration, ugi, omServiceId, protocol); - } - - @Override - protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId) - throws IOException { - // to be used for base class omProxies, - // ProxyInfo not applicable for gRPC, just need key set - Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>(); - // to be used for base class omProxyInfos - // OMProxyInfo not applicable for gRPC, just need key set - Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>(); - List<String> omNodeIDList = new ArrayList<>(); - omAddresses = new HashMap<>(); - - Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId); - - for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) { - - String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, - omSvcId, nodeId); - - Optional<String> hostaddr = getHostNameFromConfigKeys(config, - rpcAddrKey); - - OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config, - ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, - omSvcId, nodeId), - OMConfigKeys.OZONE_OM_GRPC_PORT_KEY); - if (nodeId == null) { - nodeId = OzoneConsts.OM_DEFAULT_NODE_ID; - } - omProxiesNodeIdKeyset.put(nodeId, null); - omProxyInfosNodeIdKeyset.put(nodeId, null); - if (hostaddr.isPresent()) { - omAddresses.put(nodeId, - hostaddr.get() + ":" - + hostport.orElse(config - .getObject(GrpcOmTransport - .GrpcOmTransportConfig.class) - .getPort())); - } else { - LOG.error("expected host address not defined for: {}", rpcAddrKey); - throw new ConfigurationException(rpcAddrKey + "is not defined"); - } - omNodeIDList.add(nodeId); - } - - if (omProxiesNodeIdKeyset.isEmpty()) { - throw new IllegalArgumentException("Could not find any configured " + - "addresses for OM. Please configure the system with " - + OZONE_OM_ADDRESS_KEY); - } - - // set base class omProxies, omProxyInfos, omNodeIDList - - // omProxies needed in base class - // omProxies.size == number of om nodes - // omProxies key needs to be valid nodeid - // omProxyInfos keyset needed in base class - setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList); - } - - @Override - protected Text computeDelegationTokenService() { - return new Text(); - } - - // need to throw if nodeID not in omAddresses - public String getGrpcProxyAddress(String nodeId) throws IOException { - if (omAddresses.containsKey(nodeId)) { - return omAddresses.get(nodeId); - } else { - LOG.error("expected nodeId not found in omAddresses for proxyhost {}", - nodeId); - throw new IOException( - "expected nodeId not found in omAddresses for proxyhost"); - } - - } - - public List<String> getGrpcOmNodeIDList() { - return getOmNodeIDList(); - } -} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java index 9fb690e760..5432468452 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java @@ -148,6 +148,8 @@ public class OMFailoverProxyProvider<T> implements rpcAddrStr); if (omProxyInfo.getAddress() != null) { + + // For a non-HA OM setup, nodeId might be null. If so, we assign it // the default value if (nodeId == null) { @@ -549,18 +551,14 @@ public class OMFailoverProxyProvider<T> implements return null; } - protected void setProxies( - Map<String, ProxyInfo<T>> setOMProxies, - Map<String, OMProxyInfo> setOMProxyInfos, - List<String> setOMNodeIDList) { - this.omProxies = setOMProxies; - this.omProxyInfos = setOMProxyInfos; - this.omNodeIDList = setOMNodeIDList; - } - - protected List<String> getOmNodeIDList() { - return omNodeIDList; + @VisibleForTesting + protected void setProxiesForTesting( + Map<String, ProxyInfo<T>> testOMProxies, + Map<String, OMProxyInfo> testOMProxyInfos, + List<String> testOMNodeIDList) { + this.omProxies = testOMProxies; + this.omProxyInfos = testOMProxyInfos; + this.omNodeIDList = testOMNodeIDList; } - } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java index 72c29f0cc6..3607429e52 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java @@ -18,34 +18,22 @@ package org.apache.hadoop.ozone.om.protocolPB; import java.io.IOException; -import java.lang.reflect.Constructor; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import com.google.common.net.HostAndPort; import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import org.apache.hadoop.ipc.RemoteException; - import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; @@ -54,10 +42,12 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT; +import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; /** * Grpc transport for grpc between s3g and om. @@ -70,169 +60,58 @@ public class GrpcOmTransport implements OmTransport { private final AtomicBoolean isRunning = new AtomicBoolean(false); // gRPC specific + private ManagedChannel channel; + private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client; - private Map<String, - OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients; - private Map<String, ManagedChannel> channels; - private int lastVisited = -1; - private ConfigurationSource conf; - //private String host = "om"; - private AtomicReference<String> host; + private String host = "om"; + private int port = 8981; private int maxSize; - private List<String> oms; - private RetryPolicy retryPolicy; - private int failoverCount = 0; - private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB> - omFailoverProxyProvider; - public GrpcOmTransport(ConfigurationSource conf, UserGroupInformation ugi, String omServiceId) throws IOException { + Optional<String> omHost = getHostNameFromConfigKeys(conf, + OZONE_OM_ADDRESS_KEY); + this.host = omHost.orElse("0.0.0.0"); - this.channels = new HashMap<>(); - this.clients = new HashMap<>(); - this.conf = conf; - this.host = new AtomicReference(); + port = conf.getObject(GrpcOmTransportConfig.class).getPort(); maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH, OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT); - omFailoverProxyProvider = new GrpcOMFailoverProxyProvider( - conf, - ugi, - omServiceId, - OzoneManagerProtocolPB.class); - start(); } - public void start() throws IOException { - host.set(omFailoverProxyProvider - .getGrpcProxyAddress( - omFailoverProxyProvider.getCurrentProxyOMNodeId())); - + public void start() { if (!isRunning.compareAndSet(false, true)) { LOG.info("Ignore. already started."); return; } + NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress(host, port) + .usePlaintext() + .maxInboundMessageSize(maxSize); - List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList(); - for (String nodeId : nodes) { - String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); - HostAndPort hp = HostAndPort.fromString(hostaddr); - - NettyChannelBuilder channelBuilder = - NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort()) - .usePlaintext() - .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); - channels.put(hostaddr, channelBuilder.build()); - clients.put(hostaddr, - OzoneManagerServiceGrpc - .newBlockingStub(channels.get(hostaddr))); - } - int maxFailovers = conf.getInt( - OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, - OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + channel = channelBuilder.build(); + client = OzoneManagerServiceGrpc.newBlockingStub(channel); - - retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers); LOG.info("{}: started", CLIENT_NAME); } @Override public OMResponse submitRequest(OMRequest payload) throws IOException { OMResponse resp = null; - boolean tryOtherHost = true; - ResultCodes resultCode = ResultCodes.INTERNAL_ERROR; - while (tryOtherHost) { - tryOtherHost = false; - try { - resp = clients.get(host.get()).submitRequest(payload); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - resultCode = ResultCodes.TIMEOUT; - } - Exception exp = new Exception(e); - tryOtherHost = shouldRetry(unwrapException(exp)); - if (!tryOtherHost) { - throw new OMException(resultCode); - } - } - } - return resp; - } - - private Exception unwrapException(Exception ex) { - Exception grpcException = null; try { - StatusRuntimeException srexp = - (StatusRuntimeException)ex.getCause(); - Status status = srexp.getStatus(); - LOG.debug("GRPC exception wrapped: {}", status.getDescription()); - if (status.getCode() == Status.Code.INTERNAL) { - // exception potentially generated by OzoneManagerServiceGrpc - Class<?> realClass = Class.forName(status.getDescription() - .substring(0, status.getDescription() - .indexOf(":"))); - Class<? extends Exception> cls = realClass - .asSubclass(Exception.class); - Constructor<? extends Exception> cn = cls.getConstructor(String.class); - cn.setAccessible(true); - grpcException = cn.newInstance(status.getDescription()); - IOException remote = null; - try { - String cause = status.getDescription(); - cause = cause.substring(cause.indexOf(":") + 2); - remote = new RemoteException(cause.substring(0, cause.indexOf(":")), - cause.substring(cause.indexOf(":") + 1)); - grpcException.initCause(remote); - } catch (Exception e) { - LOG.error("cannot get cause for remote exception"); - } - } else { - // exception generated by connection failure, gRPC - grpcException = ex; + resp = client.submitRequest(payload); + } catch (io.grpc.StatusRuntimeException e) { + ResultCodes resultCode = ResultCodes.INTERNAL_ERROR; + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + resultCode = ResultCodes.TIMEOUT; } - } catch (Exception e) { - grpcException = new IOException(e); - LOG.error("error unwrapping exception from OMResponse {}"); - } - return grpcException; - } - - private boolean shouldRetry(Exception ex) { - boolean retry = false; - RetryPolicy.RetryAction action = null; - try { - action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true); - LOG.debug("grpc failover retry action {}", action.action); - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - retry = false; - LOG.error("Retry request failed. " + action.reason, ex); - } else { - if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY || - (action.action == RetryPolicy.RetryAction.RetryDecision - .FAILOVER_AND_RETRY)) { - if (action.delayMillis > 0) { - try { - Thread.sleep(action.delayMillis); - } catch (Exception e) { - LOG.error("Error trying sleep thread for {}", action.delayMillis); - } - } - // switch om host to current proxy OMNodeId - host.set(omFailoverProxyProvider - .getGrpcProxyAddress( - omFailoverProxyProvider.getCurrentProxyOMNodeId())); - retry = true; - } - } - } catch (Exception e) { - LOG.error("Failed failover exception {}", e); + throw new OMException(e.getCause(), resultCode); } - return retry; + return resp; } // stub implementation for interface @@ -242,15 +121,11 @@ public class GrpcOmTransport implements OmTransport { } public void shutdown() { - for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) { - ManagedChannel channel = entry.getValue(); - channel.shutdown(); - try { - channel.awaitTermination(5, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}", - entry.getKey(), e); - } + channel.shutdown(); + try { + channel.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e); } } @@ -281,16 +156,9 @@ public class GrpcOmTransport implements OmTransport { } @VisibleForTesting - public void startClient(ManagedChannel testChannel) throws IOException { - List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList(); - for (String nodeId : nodes) { - String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); + public void startClient(ManagedChannel testChannel) { + client = OzoneManagerServiceGrpc.newBlockingStub(testChannel); - clients.put(hostaddr, - OzoneManagerServiceGrpc - .newBlockingStub(testChannel)); - } LOG.info("{}: started", CLIENT_NAME); } - } diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java index b427db5562..323bb0eeb3 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java @@ -25,29 +25,25 @@ import static org.mockito.Mockito.mock; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; -import io.grpc.ManagedChannel; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import com.google.protobuf.ServiceException; -import org.apache.ratis.protocol.RaftPeerId; +import io.grpc.ManagedChannel; -import static org.junit.Assert.fail; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK; /** * Tests for GrpcOmTransport client. @@ -63,32 +59,11 @@ public class TestS3GrpcOmTransport { private final OMResponse omResponse = OMResponse.newBuilder() .setSuccess(true) - .setStatus(org.apache.hadoop.ozone.protocol - .proto.OzoneManagerProtocolProtos.Status.OK) + .setStatus(Status.OK) .setLeaderOMNodeId(leaderOMNodeId) .setCmdType(Type.AllocateBlock) .build(); - private boolean doFailover = false; - - private OzoneConfiguration conf; - - private String omServiceId; - private UserGroupInformation ugi; - private ManagedChannel channel; - - - private ServiceException createNotLeaderException() { - RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId"); - - // TODO: Set suggest leaderID. Right now, client is not using suggest - // leaderID. Need to fix this. - OMNotLeaderException notLeaderException = - new OMNotLeaderException(raftPeerId); - LOG.debug(notLeaderException.getMessage()); - return new ServiceException(notLeaderException); - } - private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase serviceImpl = mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class, @@ -103,22 +78,10 @@ public class TestS3GrpcOmTransport { .OzoneManagerProtocolProtos .OMResponse> responseObserver) { - try { - if (doFailover) { - doFailover = false; - throw createNotLeaderException(); - } else { - responseObserver.onNext(omResponse); - responseObserver.onCompleted(); - } - } catch (Throwable e) { - IOException ex = new IOException(e.getCause()); - responseObserver.onError(io.grpc.Status - .INTERNAL - .withDescription(ex.getMessage()) - .asRuntimeException()); - } + responseObserver.onNext(omResponse); + responseObserver.onCompleted(); } + })); private GrpcOmTransport client; @@ -138,37 +101,18 @@ public class TestS3GrpcOmTransport { .start()); // Create a client channel and register for automatic graceful shutdown. - channel = grpcCleanup.register( + ManagedChannel channel = grpcCleanup.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); - omServiceId = ""; - conf = new OzoneConfiguration(); - ugi = UserGroupInformation.getCurrentUser(); - doFailover = false; - } - - @Test - public void testSubmitRequestToServer() throws Exception { - ServiceListRequest req = ServiceListRequest.newBuilder().build(); - - final OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.ServiceList) - .setVersion(CURRENT_VERSION) - .setClientId("test") - .setServiceListRequest(req) - .build(); - + String omServiceId = ""; + OzoneConfiguration conf = new OzoneConfiguration(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); client = new GrpcOmTransport(conf, ugi, omServiceId); client.startClient(channel); - - final OMResponse resp = client.submitRequest(omRequest); - Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol - .proto.OzoneManagerProtocolProtos.Status.OK); - Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId); } @Test - public void testGrpcFailoverProxy() throws Exception { + public void testSubmitRequestToServer() throws Exception { ServiceListRequest req = ServiceListRequest.newBuilder().build(); final OMRequest omRequest = OMRequest.newBuilder() @@ -178,45 +122,8 @@ public class TestS3GrpcOmTransport { .setServiceListRequest(req) .build(); - client = new GrpcOmTransport(conf, ugi, omServiceId); - client.startClient(channel); - - doFailover = true; - // first invocation generates a NotALeaderException - // failover is performed and request is internally retried - // second invocation request to server succeeds final OMResponse resp = client.submitRequest(omRequest); - Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol - .proto.OzoneManagerProtocolProtos.Status.OK); + Assert.assertEquals(resp.getStatus(), OK); Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId); } - - @Test - public void testGrpcFailoverProxyExhaustRetry() throws Exception { - ServiceListRequest req = ServiceListRequest.newBuilder().build(); - - final OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.ServiceList) - .setVersion(CURRENT_VERSION) - .setClientId("test") - .setServiceListRequest(req) - .build(); - - conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0); - client = new GrpcOmTransport(conf, ugi, omServiceId); - client.startClient(channel); - - doFailover = true; - // first invocation generates a NotALeaderException - // failover is performed and request is internally retried - // OMFailoverProvider returns Fail retry due to #attempts > - // max failovers - - try { - final OMResponse resp = client.submitRequest(omRequest); - fail(); - } catch (Exception e) { - Assert.assertTrue(true); - } - } } diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config index 4642680394..69f4e52eae 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config @@ -36,7 +36,6 @@ OZONE-SITE.XML_hdds.datanode.dir=/data/hdds OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s OZONE-SITE.XML_hdds.container.report.interval=60s -OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 ASYNC_PROFILER_HOME=/opt/profiler diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config index be93d0a6ec..498d02efae 100644 --- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config @@ -51,7 +51,6 @@ OZONE-SITE.XML_hdds.grpc.tls.enabled=true OZONE-SITE.XML_ozone.replication=3 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s OZONE-SITE.XML_hdds.container.report.interval=60s -OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh index 252f953163..7410822cfa 100755 --- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh +++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh @@ -35,7 +35,7 @@ execute_robot_test ${SCM} freon execute_robot_test ${SCM} basic/links.robot -execute_robot_test ${SCM} s3 +#execute_robot_test ${SCM} s3 execute_robot_test ${SCM} admincli diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 1c772cf46b..3269c394f7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -111,8 +111,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, OMConfigKeys.OZONE_OM_HA_PREFIX, - OMConfigKeys.OZONE_OM_TRANSPORT_CLASS, - OMConfigKeys.OZONE_OM_GRPC_PORT_KEY + OMConfigKeys.OZONE_OM_TRANSPORT_CLASS // TODO HDDS-2856 )); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java index 7fe338c83e..60942f971b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java @@ -18,16 +18,13 @@ package org.apache.hadoop.ozone.om; import java.io.IOException; -import java.util.OptionalInt; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.ha.ConfUtils; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager; import io.grpc.Server; @@ -50,20 +47,9 @@ public class GrpcOzoneManagerServer { omTranslator, OzoneDelegationTokenSecretManager delegationTokenMgr) { - OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config, - ConfUtils.addKeySuffixes( - OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, - config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY), - config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)), - OMConfigKeys.OZONE_OM_GRPC_PORT_KEY); - if (haPort.isPresent()) { - this.port = haPort.getAsInt(); - } else { - this.port = config.getObject( - GrpcOzoneManagerServerConfig.class). - getPort(); - } - + this.port = config.getObject( + GrpcOzoneManagerServerConfig.class). + getPort(); init(omTranslator, delegationTokenMgr, config); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java index a88e259a28..de11608703 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.ozone.om; -import io.grpc.Status; import com.google.protobuf.RpcController; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocol.proto @@ -68,6 +68,7 @@ public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase { "processing s3g client submit request - for command {}", request.getCmdType().name()); AtomicInteger callCount = new AtomicInteger(0); + OMResponse omResponse = null; org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1, callCount.incrementAndGet(), @@ -83,16 +84,42 @@ public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase { // for OMRequests. Test through successful ratis-enabled OMRequest // handling without dependency on hadoop IPC based Server. try { - OMResponse omResponse = this.omTranslator. + omResponse = this.omTranslator. submitRequest(NULL_RPC_CONTROLLER, request); - responseObserver.onNext(omResponse); } catch (Throwable e) { - IOException ex = new IOException(e.getCause()); - responseObserver.onError(Status - .INTERNAL - .withDescription(ex.getMessage()) - .asRuntimeException()); + IOException ioe = null; + Throwable se = e.getCause(); + if (se == null) { + ioe = new IOException(e); + } else { + ioe = se instanceof IOException ? + (IOException) se : new IOException(e); + } + omResponse = createErrorResponse( + request, + ioe); } + responseObserver.onNext(omResponse); responseObserver.onCompleted(); } + + /** + * Create OMResponse from the specified OMRequest and exception. + * + * @param omRequest + * @param exception + * @return OMResponse + */ + private OMResponse createErrorResponse( + OMRequest omRequest, IOException exception) { + OMResponse.Builder omResponse = OMResponse.newBuilder() + .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) + .setCmdType(omRequest.getCmdType()) + .setTraceID(omRequest.getTraceID()) + .setSuccess(false); + if (exception.getMessage() != null) { + omResponse.setMessage(exception.getMessage()); + } + return omResponse.build(); + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java index fe7f6f49ea..01601668b6 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java @@ -143,7 +143,7 @@ public class TestOMFailovers { omProxyInfos.put(nodeId, null); omNodeIDList.add(nodeId); } - setProxies(omProxies, omProxyInfos, omNodeIDList); + setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
