This is an automated email from the ASF dual-hosted git repository.
swagle pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4440-s3-performance by
this push:
new a115ebf HDDS-5212. Create implementation of OmTransport which uses
Grpc (#2580)
a115ebf is described below
commit a115ebfadc2fc44cd1ae7ba621e1442da2449a4b
Author: Neil Joshi <[email protected]>
AuthorDate: Wed Sep 15 08:48:15 2021 -0600
HDDS-5212. Create implementation of OmTransport which uses Grpc (#2580)
* Initial commit with s3 gRPC client and associated unit tests.
* Added configurable OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH to config keys
for s3g om OmRequest OmResponses over gRPC.
* Added suggested changes to unit tests from review. Including updating
test timeout to appropriate test timeout value from 5min to 30s and using
custom values (to reflect server response) in OMResponse to validate gRPC
transport of Ozone Manager Protocol.
* Added configuration setting in ozone-default for
ozone.om.grpc.maximum.response.length.
---
.../common/src/main/resources/ozone-default.xml | 9 +-
hadoop-ozone/common/pom.xml | 26 ++++
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 7 +-
.../ozone/om/protocolPB/GrpcOmTransport.java | 149 +++++++++++++++++++++
.../om/protocolPB/GrpcOmTransportFactory.java | 35 +++++
.../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 129 ++++++++++++++++++
.../common/src/test/resources/log4j.properties | 21 +++
.../ozone/protocolPB/TestGrpcOmTransport.java | 72 ++++++++++
8 files changed, 446 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 29961a1..4c44136 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2947,7 +2947,6 @@
<description>The S3Gateway service principal.
Ex: s3g/[email protected]</description>
</property>
-
<property>
<name>hadoop.http.idle_timeout.ms</name>
<value>60000</value>
@@ -2956,4 +2955,12 @@
OM/SCM/DN/S3GATEWAY Server connection timeout in milliseconds.
</description>
</property>
+ <property>
+ <name>ozone.om.grpc.maximum.response.length</name>
+ <value>134217728</value>
+ <tag>OZONE, OM, S3GATEWAY</tag>
+ <description>
+ OM/S3GATEWAY OMRequest, OMResponse over grpc max message length (bytes).
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-ozone/common/pom.xml b/hadoop-ozone/common/pom.xml
index 8dc07c2..e60737b 100644
--- a/hadoop-ozone/common/pom.xml
+++ b/hadoop-ozone/common/pom.xml
@@ -36,6 +36,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>${io.grpc.version}</version>
</dependency>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ <version>${io.grpc.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</dependency>
@@ -44,6 +50,26 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>netty-handler-proxy</artifactId>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index fc4c76f..8276fe1 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -276,9 +276,14 @@ public final class OMConfigKeys {
"ozone.path.deleting.limit.per.task";
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
+ public static final String OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH =
+ "ozone.om.grpc.maximum.response.length";
+ /** Default value for GRPC_MAXIMUM_RESPONSE_LENGTH. */
+ public static final int OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
+ 128 * 1024 * 1024;
+
public static final String OZONE_OM_S3_GPRC_SERVER_ENABLED =
"ozone.om.s3.grpc.server_enabled";
public static final boolean OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT =
false;
-
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
new file mode 100644
index 0000000..68c1d2c
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.io.Text;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+
+/**
+ * Grpc transport for grpc between s3g and om.
+ */
+public class GrpcOmTransport implements OmTransport {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOmTransport.class);
+
+ private static final String CLIENT_NAME = "GrpcOmTransport";
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ // gRPC specific
+ private ManagedChannel channel;
+
+ private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+
+ private String host = "om";
+ private int port = 8981;
+ private int maxSize;
+
+ public GrpcOmTransport(ConfigurationSource conf,
+ UserGroupInformation ugi, String omServiceId)
+ throws IOException {
+ Optional<String> omHost = getHostNameFromConfigKeys(conf,
+ OZONE_OM_ADDRESS_KEY);
+ this.host = omHost.orElse("0.0.0.0");
+
+ port = conf.getObject(GrpcOmTransportConfig.class).getPort();
+
+ maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+ OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+ }
+
+ public void start() {
+ if (!isRunning.compareAndSet(false, true)) {
+ LOG.info("Ignore. already started.");
+ return;
+ }
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(host, port)
+ .usePlaintext()
+ .maxInboundMessageSize(maxSize);
+
+ channel = channelBuilder.build();
+ client = OzoneManagerServiceGrpc.newBlockingStub(channel);
+
+ LOG.info("{}: started", CLIENT_NAME);
+ }
+
+ @Override
+ public OMResponse submitRequest(OMRequest payload) throws IOException {
+ return client.submitRequest(payload);
+ }
+
+ // stub implementation for interface
+ @Override
+ public Text getDelegationTokenService() {
+ return new Text();
+ }
+
+ public void shutdown() {
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutdown();
+ }
+
+ /**
+ * GrpcOmTransport configuration in Java style configuration class.
+ */
+ @ConfigGroup(prefix = "ozone.om.grpc")
+ public static final class GrpcOmTransportConfig {
+ @Config(key = "port", defaultValue = "8981",
+ description = "Port used for"
+ + " the GrpcOmTransport OzoneManagerServiceGrpc server",
+ tags = {ConfigTag.MANAGEMENT})
+ private int port;
+
+ public int getPort() {
+ return port;
+ }
+
+ public GrpcOmTransportConfig setPort(int portParam) {
+ this.port = portParam;
+ return this;
+ }
+ }
+
+ @VisibleForTesting
+ public void startClient(ManagedChannel testChannel) {
+ client = OzoneManagerServiceGrpc.newBlockingStub(testChannel);
+
+ LOG.info("{}: started", CLIENT_NAME);
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
new file mode 100644
index 0000000..5f34a3c
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Factory to create the default GrpcOm transport.
+ */
+public class GrpcOmTransportFactory implements OmTransportFactory {
+ @Override
+ public OmTransport createOmTransport(ConfigurationSource source,
+ UserGroupInformation ugi,
+ String omServiceId) throws IOException {
+ return new GrpcOmTransport(source, ugi, omServiceId);
+ }
+}
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
new file mode 100644
index 0000000..323bb0e
--- /dev/null
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+
+/**
+ * Tests for GrpcOmTransport client.
+ */
+public class TestS3GrpcOmTransport {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestS3GrpcOmTransport.class);
+
+ private final String leaderOMNodeId = "TestOM";
+
+ private final OMResponse omResponse = OMResponse.newBuilder()
+ .setSuccess(true)
+ .setStatus(Status.OK)
+ .setLeaderOMNodeId(leaderOMNodeId)
+ .setCmdType(Type.AllocateBlock)
+ .build();
+
+ private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
+ serviceImpl =
+ mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
+ delegatesTo(
+ new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
+ @Override
+ public void
submitRequest(org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos
+ .OMRequest request,
+
io.grpc.stub.StreamObserver<org.apache
+ .hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos
+ .OMResponse>
+ responseObserver) {
+ responseObserver.onNext(omResponse);
+ responseObserver.onCompleted();
+ }
+
+ }));
+
+ private GrpcOmTransport client;
+
+ @Before
+ public void setUp() throws Exception {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start,
+ // and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder
+ .forName(serverName)
+ .directExecutor()
+ .addService(serviceImpl)
+ .build()
+ .start());
+
+ // Create a client channel and register for automatic graceful shutdown.
+ ManagedChannel channel = grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+ String omServiceId = "";
+ OzoneConfiguration conf = new OzoneConfiguration();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+ }
+
+ @Test
+ public void testSubmitRequestToServer() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ final OMResponse resp = client.submitRequest(omRequest);
+ Assert.assertEquals(resp.getStatus(), OK);
+ Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
+ }
+}
diff --git a/hadoop-ozone/common/src/test/resources/log4j.properties
b/hadoop-ozone/common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b8ad21d
--- /dev/null
+++ b/hadoop-ozone/common/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2}
(%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
new file mode 100644
index 0000000..4c17ecc
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for GrpcOmTransport.
+ */
+public class TestGrpcOmTransport {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestGrpcOmTransport.class);
+ @Rule
+ public Timeout timeout = Timeout.seconds(30);
+
+
+ @Test
+ public void testGrpcOmTransportFactory() throws Exception {
+ String omServiceId = "";
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ OmTransport omTransport = OmTransportFactory.create(conf, ugi,
omServiceId);
+ Assert.assertEquals(GrpcOmTransport.class.getSimpleName(),
+ omTransport.getClass().getSimpleName());
+
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ String omServiceId = "";
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ GrpcOmTransport client = new GrpcOmTransport(conf, ugi, omServiceId);
+
+ try {
+ client.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ client.shutdown();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]