This is an automated email from the ASF dual-hosted git repository.

swagle pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4440-s3-performance by 
this push:
     new 46575ef  HDDS-5211. Start OM Grpc server as part of the OM bootstrap 
(#2601)
46575ef is described below

commit 46575efe21ef0e510c08c59473bf559d9b4630b3
Author: Neil Joshi <[email protected]>
AuthorDate: Wed Sep 8 22:40:18 2021 -0600

    HDDS-5211. Start OM Grpc server as part of the OM bootstrap (#2601)
    
    * Initial commit for s3g with om grpc server to process 
OzoneManagerProtocol s3 requests.
---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   1 +
 .../hadoop/ozone/om/GrpcOzoneManagerServer.java    |  98 +++++++++++++++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  59 +++++++++++-
 .../hadoop/ozone/om/OzoneManagerServiceGrpc.java   | 107 +++++++++++++++++++++
 .../ozone/om/TestGrpcOzoneManagerServer.java       |  63 ++++++++++++
 6 files changed, 331 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 563f99c..fc4c76f 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -276,4 +276,9 @@ public final class OMConfigKeys {
       "ozone.path.deleting.limit.per.task";
   public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
 
+  public static final String OZONE_OM_S3_GPRC_SERVER_ENABLED =
+      "ozone.om.s3.grpc.server_enabled";
+  public static final boolean OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT =
+      false;
+
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 53b8708..c36b624 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -81,6 +81,7 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         ScmConfigKeys.OZONE_SCM_ADDRESS_KEY,
         OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
         OMConfigKeys.OZONE_FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+        OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED,
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
         OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
         ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
new file mode 100644
index 0000000..d82024a
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.ozone.OzoneConsts;
+import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Separated network server for gRPC transport OzoneManagerService s3g->OM.
+ */
+public class GrpcOzoneManagerServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcOzoneManagerServer.class);
+
+  private Server server;
+  private int port = 8981;
+
+  public GrpcOzoneManagerServer(GrpcOzoneManagerServerConfig omServerConfig,
+                                OzoneManagerProtocolServerSideTranslatorPB
+                                    omTranslator) {
+    this.port = omServerConfig.getPort();
+    init(omTranslator);
+  }
+
+  public void init(OzoneManagerProtocolServerSideTranslatorPB omTranslator) {
+    NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
+        .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
+        .addService(new OzoneManagerServiceGrpc(omTranslator));
+
+    server = nettyServerBuilder.build();
+  }
+
+  public void start() throws IOException {
+    server.start();
+    LOG.info("{} is started using port {}", getClass().getSimpleName(),
+        server.getPort());
+    port = server.getPort();
+  }
+
+  public void stop() {
+    try {
+      server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+      LOG.info("Server {} is shutdown", getClass().getSimpleName());
+    } catch (InterruptedException ex) {
+      LOG.warn("{} couldn't be stopped gracefully", 
getClass().getSimpleName());
+    }
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * GrpcOzoneManagerServer configuration in Java style configuration class.
+   */
+  @ConfigGroup(prefix = "ozone.om.grpc")
+  public static final class GrpcOzoneManagerServerConfig {
+    @Config(key = "port", defaultValue = "8981",
+        description = "Port used for"
+            + " the GrpcOmTransport OzoneManagerServiceGrpc server",
+        tags = {ConfigTag.MANAGEMENT})
+    private int port;
+
+    public int getPort() {
+      return port;
+    }
+
+    public GrpcOzoneManagerServerConfig setPort(int portParam) {
+      this.port = portParam;
+      return this;
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b144371..1dd402d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import 
org.apache.hadoop.ozone.om.GrpcOzoneManagerServer.GrpcOzoneManagerServerConfig;
 import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
 import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
@@ -240,6 +241,8 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METADATA_LAYOUT_D
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METADATA_LAYOUT_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
@@ -289,6 +292,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private final Text omRpcAddressTxt;
   private OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
+  private GrpcOzoneManagerServer omS3gGrpcServer;    
   private InetSocketAddress omRpcAddress;
   private String omId;
 
@@ -327,7 +331,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private JvmPauseMonitor jvmPauseMonitor;
   private final SecurityConfig secConfig;
   private S3SecretManager s3SecretManager;
+  private final boolean isOmGrpcServerEnabled;
   private volatile boolean isOmRpcServerRunning = false;
+  private volatile boolean isOmGrpcServerRunning = false;    
   private String omComponent;
   private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
 
@@ -441,6 +447,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         OZONE_ACL_ENABLED_DEFAULT);
     this.isSpnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
         .equals("kerberos");
+    this.isOmGrpcServerEnabled = conf.getBoolean(
+        OZONE_OM_S3_GPRC_SERVER_ENABLED,
+        OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT);
     this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
         OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
     this.preallocateBlocksMax = conf.getInt(
@@ -526,6 +535,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
 
+    // Start S3g Om gRPC Server.
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+    }
     shutdownHook = () -> {
       saveOmMetrics();
     };
@@ -1001,6 +1014,20 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return rpcServer;
   }
 
+  /**
+   * Starts an s3g OmGrpc server.
+   *
+   * @param conf         configuration
+   * @return gRPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private GrpcOzoneManagerServer startGrpcServer(OzoneConfiguration conf)
+          throws IOException {
+    return new GrpcOzoneManagerServer(conf.getObject(
+            GrpcOzoneManagerServerConfig.class),
+            this.omServerProtocol);
+  }
+
   private static boolean isOzoneSecurityEnabled() {
     return securityEnabled;
   }
@@ -1330,7 +1357,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     isOmRpcServerRunning = true;
 
     startTrashEmptier(configuration);
-
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer.start();
+      isOmGrpcServerRunning = true;
+    }
     registerMXBean();
 
     startJVMPauseMonitor();
@@ -1385,7 +1415,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
 
     omRpcServer = getRpcServer(configuration);
-
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+    }
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
       httpServer.start();
@@ -1399,6 +1431,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     startTrashEmptier(configuration);
     registerMXBean();
 
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer.start();
+      isOmGrpcServerRunning = true;
+    }
     startJVMPauseMonitor();
     setStartTime();
     omState = State.RUNNING;
@@ -1550,6 +1586,19 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   /**
+   * Creates a new instance of gRPC OzoneManagerServiceGrpc transport server
+   * for serving s3g OmRequests.  If an earlier instance is already running
+   * then returns the same.
+   */
+  private GrpcOzoneManagerServer getOmS3gGrpcServer(OzoneConfiguration conf)
+      throws IOException {
+    if (isOmGrpcServerRunning) {
+      return omS3gGrpcServer;
+    }
+    return startGrpcServer(configuration);
+  }
+
+  /**
    * Creates an instance of ratis server.
    */
   /**
@@ -1639,6 +1688,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         scheduleOMMetricsWriteTask = null;
       }
       omRpcServer.stop();
+      if (isOmGrpcServerEnabled) {
+        omS3gGrpcServer.stop();
+      }
       // When ratis is not enabled, we need to call stop() to stop
       // OzoneManageDoubleBuffer in OM server protocol.
       if (!isRatisEnabled) {
@@ -1649,6 +1701,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         omRatisServer = null;
       }
       isOmRpcServerRunning = false;
+      if (isOmGrpcServerEnabled) {
+        isOmGrpcServerRunning = false;
+      }
       keyManager.stop();
       stopSecretManager();
       if (httpServer != null) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
new file mode 100644
index 0000000..95b9285
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
+import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OMResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Grpc Service for handling S3 gateway OzoneManagerProtocol client requests.
+ */
+public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerServiceGrpc.class);
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+  private OzoneManagerProtocolServerSideTranslatorPB omTranslator;
+
+  OzoneManagerServiceGrpc(
+      OzoneManagerProtocolServerSideTranslatorPB omTranslator) {
+    this.omTranslator = omTranslator;
+  }
+
+  @Override
+  public void submitRequest(OMRequest request,
+                            io.grpc.stub.StreamObserver<OMResponse>
+                                responseObserver) {
+    LOG.debug("OzoneManagerServiceGrpc: OzoneManagerServiceImplBase " +
+        "processing s3g client submit request - for command {}",
+        request.getCmdType().name());
+    AtomicInteger callCount = new AtomicInteger(0);
+    OMResponse omResponse;
+    try {
+      org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
+          callCount.incrementAndGet(),
+          null,
+          null,
+          RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+          ClientId.getClientId()));
+      // TODO: currently require setting the Server class for each request
+      // with thread context (Server.Call()) that includes retries
+      // and importantly random ClientId.  This is currently necessary for
+      // Om Ratis Server to create createWriteRaftClientRequest.
+      // Look to remove Server class requirement for issuing ratis transactions
+      // for OMRequests.  Test through successful ratis-enabled OMRequest 
+      // handling without dependency on hadoop IPC based Server.
+      omResponse = this.omTranslator.
+          submitRequest(NULL_RPC_CONTROLLER, request);
+    } catch (Throwable e) {
+      omResponse = createErrorResponse(
+          request,
+          new IOException(e));
+    }
+    responseObserver.onNext(omResponse);
+    responseObserver.onCompleted();
+  }
+
+  /**
+   * Create OMResponse from the specified OMRequest and exception.
+   *
+   * @param omRequest
+   * @param exception
+   * @return OMResponse
+   */
+  private OMResponse createErrorResponse(
+      OMRequest omRequest, IOException exception) {
+    OMResponse.Builder omResponse = OMResponse.newBuilder()
+        .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+        .setCmdType(omRequest.getCmdType())
+        .setTraceID(omRequest.getTraceID())
+        .setSuccess(false);
+    if (exception.getMessage() != null) {
+      omResponse.setMessage(exception.getMessage());
+    }
+    return omResponse.build();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
new file mode 100644
index 0000000..46155ae
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.mockito.Mockito;
+import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+
+/**
+ * Tests for GrpcOzoneManagerServer.
+ */
+public class TestGrpcOzoneManagerServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestGrpcOzoneManagerServer.class);
+  private OzoneManager ozoneManager;
+  private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
+  private GrpcOzoneManagerServer server;
+
+  @Rule
+  public Timeout timeout = Timeout.seconds(30);
+
+  @Test
+  public void testStartStop() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    omServerProtocol = ozoneManager.getOmServerProtocol();
+
+    server = new GrpcOzoneManagerServer(conf.getObject(
+            GrpcOzoneManagerServer.GrpcOzoneManagerServerConfig.class),
+        omServerProtocol);
+
+    try {
+      server.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      server.stop();
+    }
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to