This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c5eaa8ab795ac056340b1ac22ed6235f0d101120
Author: xyuanlu <[email protected]>
AuthorDate: Thu Aug 22 23:14:10 2024 -0700

    Create Gateway service channel factory (#2883)
    
    This PR introduces a new configuration system for the Helix Gateway 
service, allowing for more flexible and customizable channel configurations. 
The main changes include:
    
    -Introduction of GatewayServiceChannelConfig class to manage various 
channel configurations.
    -Implementation of a factory pattern (HelixGatewayServiceChannelFactory) 
for creating appropriate service channels based on the configuration.
    -The PR also includes various improvements in error handling, logging, and 
code organization.
---
 .../org/apache/helix/gateway/HelixGatewayMain.java |  27 ++-
 .../constant/GatewayServiceConfigConstant.java}    |  11 +-
 .../constant/GatewayServiceEventType.java          |   2 +-
 .../api/service/HelixGatewayServiceChannel.java    |  14 ++
 .../channel/GatewayServiceChannelConfig.java       | 206 +++++++++++++++++++++
 .../channel/HelixGatewayServiceChannelFactory.java |  43 +++++
 .../channel/HelixGatewayServiceGrpcService.java    |  48 ++++-
 .../HelixGatewayServicePollModeChannel.java}       |  40 +++-
 .../HelixGatewayServicePullModeChannel.java        |  25 ---
 .../constant/GatewayServiceGrpcDefaultConfig.java  |   7 -
 .../helix/gateway/service/GatewayServiceEvent.java |   2 +-
 .../gateway/service/GatewayServiceManager.java     |  27 ++-
 .../util/HelixGatewayGrpcServerBuilder.java        |  99 ----------
 .../util/StateTransitionMessageTranslateUtil.java  |   2 +-
 .../apache/helix/gateway/base/util/TestHelper.java |  10 -
 .../chanel/GatewayServiceChannelConfigTest.java    |  73 ++++++++
 .../participant/TestHelixGatewayParticipant.java   |  11 ++
 .../service/TestGatewayServiceConnection.java      |  18 +-
 .../gateway/service/TestGatewayServiceManager.java |   6 +-
 19 files changed, 485 insertions(+), 186 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
index b4b8ca0cb..df6230ca6 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -19,35 +19,32 @@ package org.apache.helix.gateway;
  * under the License.
  */
 
-import io.grpc.Server;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+
+import static java.lang.Integer.*;
 
 
 /**
  * Main class for Helix Gateway.
  * It starts the Helix Gateway grpc service.
+ * args0: zk address
+ * args1: helix gateway groc server port
  */
 public final class HelixGatewayMain {
 
   private HelixGatewayMain() {
   }
 
-  public static void main(String[] args) throws InterruptedException, 
IOException {
-    // Create a new server to listen on port 50051
-    GatewayServiceManager manager = new GatewayServiceManager();
-    Server server = new HelixGatewayGrpcServerBuilder().setPort(50051)
-        
.setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor())
-        .build();
-
-    server.start();
-    System.out.println("Server started, listening on " + server.getPort());
+  public static void main(String[] args) throws IOException {
+    // Create a new server
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+    GatewayServiceManager manager =
+        new GatewayServiceManager(args[0], 
builder.setGrpcServerPort(parseInt(args[1])).build());
 
-    // Wait for the server to shutdown
-    server.awaitTermination(365, TimeUnit.DAYS);
+    manager.startService();
   }
 }
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
similarity index 69%
copy from 
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
copy to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
index 36c469825..70e4d73e3 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.api.constant;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,8 +19,9 @@ package org.apache.helix.gateway.constant;
  * under the License.
  */
 
-public enum GatewayServiceEventType {
-  CONNECT,    // init connection to gateway service
-  UPDATE,  // update state transition result
-  DISCONNECT // shutdown connection to gateway service.
+public class GatewayServiceConfigConstant {
+  public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
+  public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
+  public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
+  public static final int DEFAULT_POLL_INTERVAL_SEC = 60;
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
similarity index 95%
copy from 
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
copy to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
index 36c469825..b5cff9978 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.api.constant;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
index 1731e158f..02c0e8a8d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.api.service;
  * under the License.
  */
 
+import java.io.IOException;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.gateway.service.GatewayServiceManager;
 import org.apache.helix.model.Message;
@@ -54,6 +55,19 @@ public interface HelixGatewayServiceChannel {
     gatewayServiceManager.onGatewayServiceEvent(event);
   }
 
+  /**
+   * Start the gateway service channel.
+   *
+   * @throws IOException if the channel cannot be started
+   */
+  public void start() throws IOException;
+
+  /**
+   * Stop the gateway service channel forcefully.
+   */
+  public void stop();
+
+
   // TODO: remove the following 2 apis in future changes
   /**
    * Gateway service close connection with error. This function is called when 
manager wants to close client
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
new file mode 100644
index 000000000..31c7af05b
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.channel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import static 
org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*;
+
+
+public class GatewayServiceChannelConfig {
+  // Mode to get helix participant information (inbound information). This 
included health check and shard state transition response
+  // We do not support hybrid mode as of now, (i.e. have push mode for 
participant liveness detection and pull mode for shard state)
+  public enum ChannelMode {
+    PUSH_MODE, // The gateway service passively receives participant 
information
+    POLL_MODE  // The gateway service actively polls participant information
+  }
+
+  // NOTE:
+  // For outbound information - stateTransition request, Gateway service will 
always push the state transition message.
+  // We do not support participant poll mode for stateTransition request as of 
now.
+
+  // channel type for the following 3 information - participant liveness 
detection, shard state transition request and response
+  // By default, they are all grpc server, user could define them separately.
+  public enum ChannelType {
+    GRPC_SERVER,
+    GRPC_CLIENT,
+    FILE
+  }
+
+  // service configs
+
+  // service mode for inbound information.
+  private ChannelMode _channelMode;
+  // channel type for participant liveness detection
+  private ChannelType _participantConnectionChannelType;
+  // channel for sending and receiving shard state transition request and 
shard state response
+  private ChannelType _shardStateChannelType;
+
+  // grpc server configs
+  private final int _grpcServerPort;
+  private final int _serverHeartBeatInterval;
+  private final int _maxAllowedClientHeartBeatInterval;
+  private final int _clientTimeout;
+  private final boolean _enableReflectionService;
+
+  // poll mode config
+  private final int _pollIntervalSec;
+  // TODO: configs for pull mode grpc client
+
+  // TODO: configs for pull mode with file
+
+  // getters
+
+  public ChannelMode getChannelMode() {
+    return _channelMode;
+  }
+  public ChannelType getParticipantConnectionChannelType() {
+    return _participantConnectionChannelType;
+  }
+
+  public ChannelType getShardStateChannelType() {
+    return _shardStateChannelType;
+  }
+
+  public int getGrpcServerPort() {
+    return _grpcServerPort;
+  }
+
+  public int getServerHeartBeatInterval() {
+    return _serverHeartBeatInterval;
+  }
+
+  public int getMaxAllowedClientHeartBeatInterval() {
+    return _maxAllowedClientHeartBeatInterval;
+  }
+
+  public int getClientTimeout() {
+    return _clientTimeout;
+  }
+
+  public boolean getEnableReflectionService() {
+    return _enableReflectionService;
+  }
+
+  public int getPollIntervalSec() {
+    return _pollIntervalSec;
+  }
+
+  private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode 
channelMode,  ChannelType participantConnectionChannelType,
+      ChannelType shardStateChannelType, int serverHeartBeatInterval, int 
maxAllowedClientHeartBeatInterval,
+      int clientTimeout, boolean enableReflectionService, int pollIntervalSec) 
{
+    _grpcServerPort = grpcServerPort;
+    _channelMode = channelMode;
+    _participantConnectionChannelType = participantConnectionChannelType;
+    _shardStateChannelType = shardStateChannelType;
+    _serverHeartBeatInterval = serverHeartBeatInterval;
+    _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
+    _clientTimeout = clientTimeout;
+    _enableReflectionService = enableReflectionService;
+    _pollIntervalSec = pollIntervalSec;
+  }
+
+  public static class GatewayServiceProcessorConfigBuilder {
+
+    // service configs
+    private ChannelMode _channelMode = ChannelMode.PUSH_MODE;
+    private ChannelType _participantConnectionChannelType = 
ChannelType.GRPC_SERVER;
+    private ChannelType _shardStatenChannelType = ChannelType.GRPC_SERVER;
+
+    // grpc server configs
+    private int _grpcServerPort;
+    private int _serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
+    private int _maxAllowedClientHeartBeatInterval = 
DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
+    private int _clientTimeout = DEFAULT_CLIENT_TIMEOUT;
+    private boolean _enableReflectionService = true;
+
+    // poll mode config
+    private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
+    // poll mode grpc client configs
+
+    // poll mode file configs
+
+
+    public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode 
channelMode) {
+      _channelMode = channelMode;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setParticipantConnectionChannelType(ChannelType channelMode) {
+      _participantConnectionChannelType = channelMode;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setShardStateProcessorType(ChannelType channelMode) {
+      _shardStatenChannelType = channelMode;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setGrpcServerPort(int 
grpcServerPort) {
+      _grpcServerPort = grpcServerPort;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setServerHeartBeatInterval(int 
serverHeartBeatInterval) {
+      _serverHeartBeatInterval = serverHeartBeatInterval;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setMaxAllowedClientHeartBeatInterval(
+        int maxAllowedClientHeartBeatInterval) {
+      _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setClientTimeout(int 
clientTimeout) {
+      _clientTimeout = clientTimeout;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setEnableReflectionService(boolean enableReflectionService) {
+      _enableReflectionService = enableReflectionService;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int 
pollIntervalSec) {
+      _pollIntervalSec = pollIntervalSec;
+      return this;
+    }
+
+    public void validate() {
+      if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
+          && _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
+          _participantConnectionChannelType != ChannelType.GRPC_SERVER
+              && _shardStatenChannelType == ChannelType.GRPC_SERVER)) {
+        throw new IllegalArgumentException(
+            "In caas of GRPC server, Participant connection channel type and 
shard state channel type must be the same");
+      }
+      if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && 
_grpcServerPort == 0) {
+        throw new IllegalArgumentException("Grpc server port must be set for 
grpc server channel type");
+      }
+    }
+
+    public GatewayServiceChannelConfig build() {
+      validate();
+      return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, 
_participantConnectionChannelType,
+          _shardStatenChannelType, _serverHeartBeatInterval, 
_maxAllowedClientHeartBeatInterval, _clientTimeout,
+          _enableReflectionService, _pollIntervalSec);
+    }
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
new file mode 100644
index 000000000..4c20b1977
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
@@ -0,0 +1,43 @@
+package org.apache.helix.gateway.channel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+
+
+public class HelixGatewayServiceChannelFactory {
+
+  public static HelixGatewayServiceChannel 
createServiceChannel(GatewayServiceChannelConfig config,
+      GatewayServiceManager manager) {
+
+    if (config.getChannelMode() == 
GatewayServiceChannelConfig.ChannelMode.PUSH_MODE) {
+      if (config.getParticipantConnectionChannelType() == 
GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) {
+        return new HelixGatewayServiceGrpcService(manager, config);
+      }
+    } else {
+      return new HelixGatewayServicePollModeChannel(config);
+    }
+    throw new IllegalArgumentException(
+        "Unsupported channel mode and type combination: " + 
config.getChannelMode() + " , "
+            + config.getParticipantConnectionChannelType());
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index a176ca89c..ea867cacf 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -19,10 +19,15 @@ package org.apache.helix.gateway.channel;
  * under the License.
  */
 
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
@@ -58,8 +63,13 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   // A fine grain lock register on instance level
   private final PerKeyLockRegistry _lockRegistry;
 
-  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+  private final GatewayServiceChannelConfig _config;
+
+  private Server _server;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager, 
GatewayServiceChannelConfig config) {
     _manager = manager;
+    _config = config;
     _lockRegistry = new PerKeyLockRegistry();
   }
 
@@ -176,4 +186,40 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
       _reversedObserverMap.put(streamObserver, new 
ImmutablePair<>(instanceName, clusterName));
     });
   }
+
+  @Override
+  public void start() throws IOException {
+    ServerBuilder serverBuilder = 
ServerBuilder.forPort(_config.getGrpcServerPort())
+        .addService(this)
+        .keepAliveTime(_config.getServerHeartBeatInterval(),
+            TimeUnit.SECONDS)  // HeartBeat time
+        .keepAliveTimeout(_config.getClientTimeout(),
+            TimeUnit.SECONDS)  // KeepAlive client timeout
+        .permitKeepAliveTime(_config.getMaxAllowedClientHeartBeatInterval(),
+            TimeUnit.SECONDS)  // Permit min HeartBeat time
+        .permitKeepAliveWithoutCalls(true);  // Allow KeepAlive forever 
without active RPC
+    if (_config.getEnableReflectionService()) {
+      serverBuilder = 
serverBuilder.addService(io.grpc.protobuf.services.ProtoReflectionService.newInstance());
+    }
+    _server = serverBuilder.build();
+
+    logger.info("Starting grpc server on port " + _config.getGrpcServerPort() 
+ " now.... Server heart beat interval: "
+        + _config.getServerHeartBeatInterval() + " seconds, Max allowed client 
heart beat interval: "
+        + _config.getMaxAllowedClientHeartBeatInterval() + " seconds, Client 
timeout: " + _config.getClientTimeout()
+        + " seconds, Enable reflection service: " + 
_config.getEnableReflectionService());
+    _server.start();
+  }
+
+  @Override
+  public void stop() {
+    if (_server != null) {
+      logger.info("Shutting down grpc server now....");
+      _server.shutdownNow();
+    }
+  }
+
+  @VisibleForTesting
+  Server getServer() {
+    return _server;
+  }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
similarity index 51%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 36c469825..30a9f1802 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.channel;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,8 +19,38 @@ package org.apache.helix.gateway.constant;
  * under the License.
  */
 
-public enum GatewayServiceEventType {
-  CONNECT,    // init connection to gateway service
-  UPDATE,  // update state transition result
-  DISCONNECT // shutdown connection to gateway service.
+import java.io.IOException;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.model.Message;
+
+// TODO: implement this class
+public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceChannel {
+
+  public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig 
config) {
+  }
+
+  @Override
+  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
+
+  }
+
+  @Override
+  public void start() throws IOException {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public void closeConnectionWithError(String instanceName, String reason) {
+
+  }
+
+  @Override
+  public void completeConnection(String instanceName) {
+
+  }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
deleted file mode 100644
index 0d80a96d0..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.helix.gateway.channel;
-
-import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
-import org.apache.helix.model.Message;
-
-
-public class HelixGatewayServicePullModeChannel implements 
HelixGatewayServiceChannel {
-
-    public HelixGatewayServicePullModeChannel() {
-    }
-  @Override
-  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
-
-  }
-
-  @Override
-  public void closeConnectionWithError(String instanceName, String reason) {
-
-  }
-
-  @Override
-  public void completeConnection(String instanceName) {
-
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
deleted file mode 100644
index 3f1bbafbc..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-public class GatewayServiceGrpcDefaultConfig {
-  public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
-  public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
-  public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
index b919429b9..f5c66dea0 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -21,7 +21,7 @@ package org.apache.helix.gateway.service;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 
 
 /**
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 2b626d395..289c5513c 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,9 +27,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
@@ -57,13 +59,16 @@ public class GatewayServiceManager {
   // It is used to ensure for each instance, the connect/disconnect event 
won't start until the previous one is done.
   private final PerKeyBlockingExecutor _connectionEventProcessor;
 
-  public GatewayServiceManager() {
+  private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
+
+  public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
     _helixGatewayParticipantMap = new ConcurrentHashMap<>();
-    _zkAddress = "foo";
+    _zkAddress = zkAddress;
     _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
-    _gatewayServiceChannel = new HelixGatewayServiceGrpcService(this);
+    _gatewayServiceChannel = 
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
 this);
     _connectionEventProcessor =
         new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
+    _gatewayServiceChannelConfig = gatewayServiceChannelConfig;
   }
 
   /**
@@ -127,8 +132,16 @@ public class GatewayServiceManager {
     }
   }
 
-  public HelixGatewayServiceChannel getHelixGatewayServiceProcessor() {
-    return _gatewayServiceChannel;
+
+  public void startService() throws IOException {
+    _gatewayServiceChannel.start();
+  }
+
+  public void stopService() {
+    _gatewayServiceChannel.stop();
+    _connectionEventProcessor.shutdown();
+    _participantStateTransitionResultUpdator.shutdown();
+    _helixGatewayParticipantMap.clear();
   }
 
   private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
deleted file mode 100644
index afa1477ce..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.helix.gateway.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import io.grpc.BindableService;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.protobuf.services.ProtoReflectionService;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
-
-
-/**
-   * Builder class to create a Helix gateway service server with custom 
configurations.
-   */
-  public  class HelixGatewayGrpcServerBuilder {
-    private int port;
-    private BindableService service;
-    private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
-    private int maxAllowedClientHeartBeatInterval = 
DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
-    private int clientTimeout = DEFAULT_CLIENT_TIMEOUT;
-    private boolean enableReflectionService = true;
-
-    public HelixGatewayGrpcServerBuilder setPort(int port) {
-      this.port = port;
-      return this;
-    }
-
-    public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int 
serverHeartBeatInterval) {
-      this.serverHeartBeatInterval = serverHeartBeatInterval;
-      return this;
-    }
-
-    public HelixGatewayGrpcServerBuilder 
setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) {
-      this.maxAllowedClientHeartBeatInterval = 
maxAllowedClientHeartBeatInterval;
-      return this;
-    }
-
-    public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) {
-      this.clientTimeout = clientTimeout;
-      return this;
-    }
-
-    public HelixGatewayGrpcServerBuilder setGrpcService(BindableService 
service) {
-      this.service = service;
-      return this;
-    }
-
-    public HelixGatewayGrpcServerBuilder enableReflectionService(boolean 
enableReflectionService) {
-      this.enableReflectionService = enableReflectionService;
-      return this;
-    }
-
-    public Server build() {
-      validate();
-
-      ServerBuilder serverBuilder = ServerBuilder.forPort(port)
-          .addService(service)
-          .keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS)  // 
HeartBeat time
-          .keepAliveTimeout(clientTimeout, TimeUnit.SECONDS)  // KeepAlive 
client timeout
-          .permitKeepAliveTime(maxAllowedClientHeartBeatInterval, 
TimeUnit.SECONDS)  // Permit min HeartBeat time
-          .permitKeepAliveWithoutCalls(true);  // Allow KeepAlive forever 
without active RPCs
-
-      if (enableReflectionService) {
-        serverBuilder.addService(ProtoReflectionService.newInstance());
-      }
-
-      return serverBuilder
-          .build();
-    }
-
-    private void validate() {
-      if (port == 0 || service == null) {
-        throw new IllegalArgumentException("Port and service must be set");
-      }
-      if (clientTimeout < maxAllowedClientHeartBeatInterval) {
-        throw new IllegalArgumentException("Client timeout is less than max 
allowed client heartbeat interval");
-      }
-    }
-  }
-
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index ecc6c9568..8f45407c0 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.model.Message;
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
index 15b95448f..2fd482d7d 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -19,7 +19,6 @@ package org.apache.helix.gateway.base.util;
  * under the License.
  */
 
-import io.grpc.Server;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -44,9 +43,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -798,10 +794,4 @@ public class TestHelper {
       Thread.sleep(50);
     } while (true);
   }
-
-  public static Server createHelixGatewayServer(int port, 
GatewayServiceManager manager) {
-    return new HelixGatewayGrpcServerBuilder().setPort(port)
-        .setGrpcService((HelixGatewayServiceGrpcService) 
manager.getHelixGatewayServiceProcessor())
-        .build();
-  }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
new file mode 100644
index 000000000..30f6c9692
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
@@ -0,0 +1,73 @@
+package org.apache.helix.gateway.chanel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GatewayServiceChannelConfigTest {
+
+  @Test
+  public void testGatewayServiceChannelConfigBuilder() {
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+
+    builder.setChannelMode(GatewayServiceChannelConfig.ChannelMode.PUSH_MODE)
+        
.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER)
+        
.setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER)
+        .setGrpcServerPort(50051)
+        .setServerHeartBeatInterval(30)
+        .setMaxAllowedClientHeartBeatInterval(60)
+        .setClientTimeout(120)
+        .setEnableReflectionService(true)
+        .setPollIntervalSec(10);
+
+    GatewayServiceChannelConfig config = builder.build();
+
+    Assert.assertEquals(config.getChannelMode(), 
GatewayServiceChannelConfig.ChannelMode.PUSH_MODE);
+    Assert.assertEquals(config.getParticipantConnectionChannelType(),
+        GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+    Assert.assertEquals(config.getShardStateChannelType(), 
GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+    Assert.assertEquals(config.getGrpcServerPort(), 50051);
+    Assert.assertEquals(config.getServerHeartBeatInterval(), 30);
+    Assert.assertEquals(config.getMaxAllowedClientHeartBeatInterval(), 60);
+    Assert.assertEquals(config.getClientTimeout(), 120);
+    Assert.assertTrue(config.getEnableReflectionService());
+    Assert.assertEquals(config.getPollIntervalSec(), 10);
+  }
+
+  @Test
+  public void testInvalidConfig() {
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+
+    
builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+
+    // assert er get an exception
+    try {
+      builder.build();
+      Assert.fail("Should have thrown an exception");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index d3180a16c..2c593ca12 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.participant;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -356,6 +357,16 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
       _pendingMessageMap.put(instanceName, message);
     }
 
+    @Override
+    public void start() throws IOException {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
     @Override
     public void closeConnectionWithError(String instanceName, String reason) {
       _errorDisconnectCount.incrementAndGet();
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index 627c572e9..19c92f5e2 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -21,15 +21,13 @@ package org.apache.helix.gateway.service;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.Server;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.gateway.base.HelixGatewayTestBase;
-import org.apache.helix.gateway.base.util.TestHelper;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
@@ -44,8 +42,10 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
   @Test
   public void TestLivenessDetection() throws IOException, InterruptedException 
{
     // start the gateway service
-    Server server = TestHelper.createHelixGatewayServer(50051, new 
DummyGatewayServiceManager());
-    server.start();
+    GatewayServiceChannelConfig config =
+        new 
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build();
+    GatewayServiceManager mng = new DummyGatewayServiceManager(config);
+    mng.startService();
 
     // start the client
     HelixGatewayClient client = new HelixGatewayClient("localhost", 50051);
@@ -69,6 +69,8 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
     // assert we have disconnect event when shutting down ungracefully
     client2.shutdownByClosingConnection();
     Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS));
+
+    mng.stopService();
   }
 
   public class HelixGatewayClient {
@@ -133,8 +135,8 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
 
   class DummyGatewayServiceManager extends GatewayServiceManager {
 
-    public DummyGatewayServiceManager() {
-      super();
+    public DummyGatewayServiceManager(GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
+      super("dummyZkAddress", gatewayServiceChannelConfig);
     }
 
     @Override
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 873841cae..41604b427 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -1,5 +1,6 @@
 package org.apache.helix.gateway.service;
 
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
 import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
@@ -16,7 +17,8 @@ public class TestGatewayServiceManager {
   public void testConnectionAndDisconnectionEvents() {
 
     manager = mock(GatewayServiceManager.class);
-    HelixGatewayServiceGrpcService grpcService = new 
HelixGatewayServiceGrpcService(manager);
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = 
new 
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051);
+    HelixGatewayServiceGrpcService grpcService = new 
HelixGatewayServiceGrpcService(manager,builder.build());
     // Mock a connection event
     HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent =
         HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
@@ -42,5 +44,7 @@ public class TestGatewayServiceManager {
     grpcService.report(null).onNext(disconnectionEvent);
     // Verify the events were processed in sequence
     verify(manager, times(2)).onGatewayServiceEvent(any());
+
+    grpcService.stop();
   }
 }


Reply via email to