This is an automated email from the ASF dual-hosted git repository.
swagle pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4440-s3-performance by
this push:
new 46575ef HDDS-5211. Start OM Grpc server as part of the OM bootstrap
(#2601)
46575ef is described below
commit 46575efe21ef0e510c08c59473bf559d9b4630b3
Author: Neil Joshi <[email protected]>
AuthorDate: Wed Sep 8 22:40:18 2021 -0600
HDDS-5211. Start OM Grpc server as part of the OM bootstrap (#2601)
* Initial commit for s3g with om grpc server to process
OzoneManagerProtocol s3 requests.
---
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +
.../hadoop/ozone/TestOzoneConfigurationFields.java | 1 +
.../hadoop/ozone/om/GrpcOzoneManagerServer.java | 98 +++++++++++++++++++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 59 +++++++++++-
.../hadoop/ozone/om/OzoneManagerServiceGrpc.java | 107 +++++++++++++++++++++
.../ozone/om/TestGrpcOzoneManagerServer.java | 63 ++++++++++++
6 files changed, 331 insertions(+), 2 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 563f99c..fc4c76f 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -276,4 +276,9 @@ public final class OMConfigKeys {
"ozone.path.deleting.limit.per.task";
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
+ public static final String OZONE_OM_S3_GPRC_SERVER_ENABLED =
+ "ozone.om.s3.grpc.server_enabled";
+ public static final boolean OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT =
+ false;
+
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 53b8708..c36b624 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -81,6 +81,7 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
ScmConfigKeys.OZONE_SCM_ADDRESS_KEY,
OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
OMConfigKeys.OZONE_FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+ OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED,
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
new file mode 100644
index 0000000..d82024a
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.ozone.OzoneConsts;
+import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Separated network server for gRPC transport OzoneManagerService s3g->OM.
+ */
+public class GrpcOzoneManagerServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOzoneManagerServer.class);
+
+ private Server server;
+ private int port = 8981;
+
+ public GrpcOzoneManagerServer(GrpcOzoneManagerServerConfig omServerConfig,
+ OzoneManagerProtocolServerSideTranslatorPB
+ omTranslator) {
+ this.port = omServerConfig.getPort();
+ init(omTranslator);
+ }
+
+ public void init(OzoneManagerProtocolServerSideTranslatorPB omTranslator) {
+ NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
+ .addService(new OzoneManagerServiceGrpc(omTranslator));
+
+ server = nettyServerBuilder.build();
+ }
+
+ public void start() throws IOException {
+ server.start();
+ LOG.info("{} is started using port {}", getClass().getSimpleName(),
+ server.getPort());
+ port = server.getPort();
+ }
+
+ public void stop() {
+ try {
+ server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+ LOG.info("Server {} is shutdown", getClass().getSimpleName());
+ } catch (InterruptedException ex) {
+ LOG.warn("{} couldn't be stopped gracefully",
getClass().getSimpleName());
+ }
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * GrpcOzoneManagerServer configuration in Java style configuration class.
+ */
+ @ConfigGroup(prefix = "ozone.om.grpc")
+ public static final class GrpcOzoneManagerServerConfig {
+ @Config(key = "port", defaultValue = "8981",
+ description = "Port used for"
+ + " the GrpcOmTransport OzoneManagerServiceGrpc server",
+ tags = {ConfigTag.MANAGEMENT})
+ private int port;
+
+ public int getPort() {
+ return port;
+ }
+
+ public GrpcOzoneManagerServerConfig setPort(int portParam) {
+ this.port = portParam;
+ return this;
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b144371..1dd402d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import
org.apache.hadoop.ozone.om.GrpcOzoneManagerServer.GrpcOzoneManagerServerConfig;
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
@@ -240,6 +241,8 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METADATA_LAYOUT_D
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METADATA_LAYOUT_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
@@ -289,6 +292,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private final Text omRpcAddressTxt;
private OzoneConfiguration configuration;
private RPC.Server omRpcServer;
+ private GrpcOzoneManagerServer omS3gGrpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
@@ -327,7 +331,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
+ private final boolean isOmGrpcServerEnabled;
private volatile boolean isOmRpcServerRunning = false;
+ private volatile boolean isOmGrpcServerRunning = false;
private String omComponent;
private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
@@ -441,6 +447,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
OZONE_ACL_ENABLED_DEFAULT);
this.isSpnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
.equals("kerberos");
+ this.isOmGrpcServerEnabled = conf.getBoolean(
+ OZONE_OM_S3_GPRC_SERVER_ENABLED,
+ OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT);
this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
this.preallocateBlocksMax = conf.getInt(
@@ -526,6 +535,10 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+ // Start S3g Om gRPC Server.
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+ }
shutdownHook = () -> {
saveOmMetrics();
};
@@ -1001,6 +1014,20 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return rpcServer;
}
+ /**
+ * Starts an s3g OmGrpc server.
+ *
+ * @param conf configuration
+ * @return gRPC server
+ * @throws IOException if there is an I/O error while creating RPC server
+ */
+ private GrpcOzoneManagerServer startGrpcServer(OzoneConfiguration conf)
+ throws IOException {
+ return new GrpcOzoneManagerServer(conf.getObject(
+ GrpcOzoneManagerServerConfig.class),
+ this.omServerProtocol);
+ }
+
private static boolean isOzoneSecurityEnabled() {
return securityEnabled;
}
@@ -1330,7 +1357,10 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
isOmRpcServerRunning = true;
startTrashEmptier(configuration);
-
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.start();
+ isOmGrpcServerRunning = true;
+ }
registerMXBean();
startJVMPauseMonitor();
@@ -1385,7 +1415,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
omRpcServer = getRpcServer(configuration);
-
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+ }
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -1399,6 +1431,10 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
startTrashEmptier(configuration);
registerMXBean();
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.start();
+ isOmGrpcServerRunning = true;
+ }
startJVMPauseMonitor();
setStartTime();
omState = State.RUNNING;
@@ -1550,6 +1586,19 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
/**
+ * Creates a new instance of gRPC OzoneManagerServiceGrpc transport server
+ * for serving s3g OmRequests. If an earlier instance is already running
+ * then returns the same.
+ */
+ private GrpcOzoneManagerServer getOmS3gGrpcServer(OzoneConfiguration conf)
+ throws IOException {
+ if (isOmGrpcServerRunning) {
+ return omS3gGrpcServer;
+ }
+ return startGrpcServer(configuration);
+ }
+
+ /**
* Creates an instance of ratis server.
*/
/**
@@ -1639,6 +1688,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
scheduleOMMetricsWriteTask = null;
}
omRpcServer.stop();
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.stop();
+ }
// When ratis is not enabled, we need to call stop() to stop
// OzoneManageDoubleBuffer in OM server protocol.
if (!isRatisEnabled) {
@@ -1649,6 +1701,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
omRatisServer = null;
}
isOmRpcServerRunning = false;
+ if (isOmGrpcServerEnabled) {
+ isOmGrpcServerRunning = false;
+ }
keyManager.stop();
stopSecretManager();
if (httpServer != null) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
new file mode 100644
index 0000000..95b9285
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
+import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Grpc Service for handling S3 gateway OzoneManagerProtocol client requests.
+ */
+public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneManagerServiceGrpc.class);
+ /**
+ * RpcController is not used and hence is set to null.
+ */
+ private static final RpcController NULL_RPC_CONTROLLER = null;
+ private OzoneManagerProtocolServerSideTranslatorPB omTranslator;
+
+ OzoneManagerServiceGrpc(
+ OzoneManagerProtocolServerSideTranslatorPB omTranslator) {
+ this.omTranslator = omTranslator;
+ }
+
+ @Override
+ public void submitRequest(OMRequest request,
+ io.grpc.stub.StreamObserver<OMResponse>
+ responseObserver) {
+ LOG.debug("OzoneManagerServiceGrpc: OzoneManagerServiceImplBase " +
+ "processing s3g client submit request - for command {}",
+ request.getCmdType().name());
+ AtomicInteger callCount = new AtomicInteger(0);
+ OMResponse omResponse;
+ try {
+ org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
+ callCount.incrementAndGet(),
+ null,
+ null,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ ClientId.getClientId()));
+ // TODO: currently require setting the Server class for each request
+ // with thread context (Server.Call()) that includes retries
+ // and importantly random ClientId. This is currently necessary for
+ // Om Ratis Server to create createWriteRaftClientRequest.
+ // Look to remove Server class requirement for issuing ratis transactions
+ // for OMRequests. Test through successful ratis-enabled OMRequest
+ // handling without dependency on hadoop IPC based Server.
+ omResponse = this.omTranslator.
+ submitRequest(NULL_RPC_CONTROLLER, request);
+ } catch (Throwable e) {
+ omResponse = createErrorResponse(
+ request,
+ new IOException(e));
+ }
+ responseObserver.onNext(omResponse);
+ responseObserver.onCompleted();
+ }
+
+ /**
+ * Create OMResponse from the specified OMRequest and exception.
+ *
+ * @param omRequest
+ * @param exception
+ * @return OMResponse
+ */
+ private OMResponse createErrorResponse(
+ OMRequest omRequest, IOException exception) {
+ OMResponse.Builder omResponse = OMResponse.newBuilder()
+ .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+ .setCmdType(omRequest.getCmdType())
+ .setTraceID(omRequest.getTraceID())
+ .setSuccess(false);
+ if (exception.getMessage() != null) {
+ omResponse.setMessage(exception.getMessage());
+ }
+ return omResponse.build();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
new file mode 100644
index 0000000..46155ae
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.mockito.Mockito;
+import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+
+/**
+ * Tests for GrpcOzoneManagerServer.
+ */
+public class TestGrpcOzoneManagerServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestGrpcOzoneManagerServer.class);
+ private OzoneManager ozoneManager;
+ private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
+ private GrpcOzoneManagerServer server;
+
+ @Rule
+ public Timeout timeout = Timeout.seconds(30);
+
+ @Test
+ public void testStartStop() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ozoneManager = Mockito.mock(OzoneManager.class);
+ omServerProtocol = ozoneManager.getOmServerProtocol();
+
+ server = new GrpcOzoneManagerServer(conf.getObject(
+ GrpcOzoneManagerServer.GrpcOzoneManagerServerConfig.class),
+ omServerProtocol);
+
+ try {
+ server.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ server.stop();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]