This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new dad8c8cd2 Create Gateway service channel factory (#2883)
dad8c8cd2 is described below
commit dad8c8cd24a0129081581a370dc6f6ece78a4ba2
Author: xyuanlu <[email protected]>
AuthorDate: Thu Aug 22 23:14:10 2024 -0700
Create Gateway service channel factory (#2883)
This PR introduces a new configuration system for the Helix Gateway
service, allowing for more flexible and customizable channel configurations.
The main changes include:
-Introduction of GatewayServiceChannelConfig class to manage various
channel configurations.
-Implementation of a factory pattern (HelixGatewayServiceChannelFactory)
for creating appropriate service channels based on the configuration.
-The PR also includes various improvements in error handling, logging, and
code organization.
---
.../org/apache/helix/gateway/HelixGatewayMain.java | 27 ++-
.../constant/GatewayServiceConfigConstant.java} | 11 +-
.../constant/GatewayServiceEventType.java | 2 +-
.../api/service/HelixGatewayServiceChannel.java | 14 ++
.../channel/GatewayServiceChannelConfig.java | 206 +++++++++++++++++++++
.../channel/HelixGatewayServiceChannelFactory.java | 43 +++++
.../channel/HelixGatewayServiceGrpcService.java | 48 ++++-
.../HelixGatewayServicePollModeChannel.java} | 40 +++-
.../HelixGatewayServicePullModeChannel.java | 25 ---
.../constant/GatewayServiceGrpcDefaultConfig.java | 7 -
.../helix/gateway/service/GatewayServiceEvent.java | 2 +-
.../gateway/service/GatewayServiceManager.java | 27 ++-
.../util/HelixGatewayGrpcServerBuilder.java | 99 ----------
.../util/StateTransitionMessageTranslateUtil.java | 2 +-
.../apache/helix/gateway/base/util/TestHelper.java | 10 -
.../chanel/GatewayServiceChannelConfigTest.java | 73 ++++++++
.../participant/TestHelixGatewayParticipant.java | 11 ++
.../service/TestGatewayServiceConnection.java | 18 +-
.../gateway/service/TestGatewayServiceManager.java | 6 +-
19 files changed, 485 insertions(+), 186 deletions(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
index b4b8ca0cb..df6230ca6 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -19,35 +19,32 @@ package org.apache.helix.gateway;
* under the License.
*/
-import io.grpc.Server;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+
+import static java.lang.Integer.*;
/**
* Main class for Helix Gateway.
* It starts the Helix Gateway grpc service.
+ * args0: zk address
+ * args1: helix gateway groc server port
*/
public final class HelixGatewayMain {
private HelixGatewayMain() {
}
- public static void main(String[] args) throws InterruptedException,
IOException {
- // Create a new server to listen on port 50051
- GatewayServiceManager manager = new GatewayServiceManager();
- Server server = new HelixGatewayGrpcServerBuilder().setPort(50051)
-
.setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor())
- .build();
-
- server.start();
- System.out.println("Server started, listening on " + server.getPort());
+ public static void main(String[] args) throws IOException {
+ // Create a new server
+ GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+ new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+ GatewayServiceManager manager =
+ new GatewayServiceManager(args[0],
builder.setGrpcServerPort(parseInt(args[1])).build());
- // Wait for the server to shutdown
- server.awaitTermination(365, TimeUnit.DAYS);
+ manager.startService();
}
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
similarity index 69%
copy from
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
copy to
helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
index 36c469825..70e4d73e3 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.api.constant;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,8 +19,9 @@ package org.apache.helix.gateway.constant;
* under the License.
*/
-public enum GatewayServiceEventType {
- CONNECT, // init connection to gateway service
- UPDATE, // update state transition result
- DISCONNECT // shutdown connection to gateway service.
+public class GatewayServiceConfigConstant {
+ public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
+ public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
+ public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
+ public static final int DEFAULT_POLL_INTERVAL_SEC = 60;
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
similarity index 95%
copy from
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
copy to
helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
index 36c469825..b5cff9978 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.api.constant;
/*
* Licensed to the Apache Software Foundation (ASF) under one
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
index 1731e158f..02c0e8a8d 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.api.service;
* under the License.
*/
+import java.io.IOException;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.model.Message;
@@ -54,6 +55,19 @@ public interface HelixGatewayServiceChannel {
gatewayServiceManager.onGatewayServiceEvent(event);
}
+ /**
+ * Start the gateway service channel.
+ *
+ * @throws IOException if the channel cannot be started
+ */
+ public void start() throws IOException;
+
+ /**
+ * Stop the gateway service channel forcefully.
+ */
+ public void stop();
+
+
// TODO: remove the following 2 apis in future changes
/**
* Gateway service close connection with error. This function is called when
manager wants to close client
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
new file mode 100644
index 000000000..31c7af05b
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
@@ -0,0 +1,206 @@
+package org.apache.helix.gateway.channel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import static
org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*;
+
+
+public class GatewayServiceChannelConfig {
+ // Mode to get helix participant information (inbound information). This
included health check and shard state transition response
+ // We do not support hybrid mode as of now, (i.e. have push mode for
participant liveness detection and pull mode for shard state)
+ public enum ChannelMode {
+ PUSH_MODE, // The gateway service passively receives participant
information
+ POLL_MODE // The gateway service actively polls participant information
+ }
+
+ // NOTE:
+ // For outbound information - stateTransition request, Gateway service will
always push the state transition message.
+ // We do not support participant poll mode for stateTransition request as of
now.
+
+ // channel type for the following 3 information - participant liveness
detection, shard state transition request and response
+ // By default, they are all grpc server, user could define them separately.
+ public enum ChannelType {
+ GRPC_SERVER,
+ GRPC_CLIENT,
+ FILE
+ }
+
+ // service configs
+
+ // service mode for inbound information.
+ private ChannelMode _channelMode;
+ // channel type for participant liveness detection
+ private ChannelType _participantConnectionChannelType;
+ // channel for sending and receiving shard state transition request and
shard state response
+ private ChannelType _shardStateChannelType;
+
+ // grpc server configs
+ private final int _grpcServerPort;
+ private final int _serverHeartBeatInterval;
+ private final int _maxAllowedClientHeartBeatInterval;
+ private final int _clientTimeout;
+ private final boolean _enableReflectionService;
+
+ // poll mode config
+ private final int _pollIntervalSec;
+ // TODO: configs for pull mode grpc client
+
+ // TODO: configs for pull mode with file
+
+ // getters
+
+ public ChannelMode getChannelMode() {
+ return _channelMode;
+ }
+ public ChannelType getParticipantConnectionChannelType() {
+ return _participantConnectionChannelType;
+ }
+
+ public ChannelType getShardStateChannelType() {
+ return _shardStateChannelType;
+ }
+
+ public int getGrpcServerPort() {
+ return _grpcServerPort;
+ }
+
+ public int getServerHeartBeatInterval() {
+ return _serverHeartBeatInterval;
+ }
+
+ public int getMaxAllowedClientHeartBeatInterval() {
+ return _maxAllowedClientHeartBeatInterval;
+ }
+
+ public int getClientTimeout() {
+ return _clientTimeout;
+ }
+
+ public boolean getEnableReflectionService() {
+ return _enableReflectionService;
+ }
+
+ public int getPollIntervalSec() {
+ return _pollIntervalSec;
+ }
+
+ private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode
channelMode, ChannelType participantConnectionChannelType,
+ ChannelType shardStateChannelType, int serverHeartBeatInterval, int
maxAllowedClientHeartBeatInterval,
+ int clientTimeout, boolean enableReflectionService, int pollIntervalSec)
{
+ _grpcServerPort = grpcServerPort;
+ _channelMode = channelMode;
+ _participantConnectionChannelType = participantConnectionChannelType;
+ _shardStateChannelType = shardStateChannelType;
+ _serverHeartBeatInterval = serverHeartBeatInterval;
+ _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
+ _clientTimeout = clientTimeout;
+ _enableReflectionService = enableReflectionService;
+ _pollIntervalSec = pollIntervalSec;
+ }
+
+ public static class GatewayServiceProcessorConfigBuilder {
+
+ // service configs
+ private ChannelMode _channelMode = ChannelMode.PUSH_MODE;
+ private ChannelType _participantConnectionChannelType =
ChannelType.GRPC_SERVER;
+ private ChannelType _shardStatenChannelType = ChannelType.GRPC_SERVER;
+
+ // grpc server configs
+ private int _grpcServerPort;
+ private int _serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
+ private int _maxAllowedClientHeartBeatInterval =
DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
+ private int _clientTimeout = DEFAULT_CLIENT_TIMEOUT;
+ private boolean _enableReflectionService = true;
+
+ // poll mode config
+ private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
+ // poll mode grpc client configs
+
+ // poll mode file configs
+
+
+ public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode
channelMode) {
+ _channelMode = channelMode;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder
setParticipantConnectionChannelType(ChannelType channelMode) {
+ _participantConnectionChannelType = channelMode;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder
setShardStateProcessorType(ChannelType channelMode) {
+ _shardStatenChannelType = channelMode;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder setGrpcServerPort(int
grpcServerPort) {
+ _grpcServerPort = grpcServerPort;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder setServerHeartBeatInterval(int
serverHeartBeatInterval) {
+ _serverHeartBeatInterval = serverHeartBeatInterval;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder
setMaxAllowedClientHeartBeatInterval(
+ int maxAllowedClientHeartBeatInterval) {
+ _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder setClientTimeout(int
clientTimeout) {
+ _clientTimeout = clientTimeout;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder
setEnableReflectionService(boolean enableReflectionService) {
+ _enableReflectionService = enableReflectionService;
+ return this;
+ }
+
+ public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int
pollIntervalSec) {
+ _pollIntervalSec = pollIntervalSec;
+ return this;
+ }
+
+ public void validate() {
+ if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
+ && _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
+ _participantConnectionChannelType != ChannelType.GRPC_SERVER
+ && _shardStatenChannelType == ChannelType.GRPC_SERVER)) {
+ throw new IllegalArgumentException(
+ "In caas of GRPC server, Participant connection channel type and
shard state channel type must be the same");
+ }
+ if (_participantConnectionChannelType == ChannelType.GRPC_SERVER &&
_grpcServerPort == 0) {
+ throw new IllegalArgumentException("Grpc server port must be set for
grpc server channel type");
+ }
+ }
+
+ public GatewayServiceChannelConfig build() {
+ validate();
+ return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode,
_participantConnectionChannelType,
+ _shardStatenChannelType, _serverHeartBeatInterval,
_maxAllowedClientHeartBeatInterval, _clientTimeout,
+ _enableReflectionService, _pollIntervalSec);
+ }
+ }
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
new file mode 100644
index 000000000..4c20b1977
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
@@ -0,0 +1,43 @@
+package org.apache.helix.gateway.channel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+
+
+public class HelixGatewayServiceChannelFactory {
+
+ public static HelixGatewayServiceChannel
createServiceChannel(GatewayServiceChannelConfig config,
+ GatewayServiceManager manager) {
+
+ if (config.getChannelMode() ==
GatewayServiceChannelConfig.ChannelMode.PUSH_MODE) {
+ if (config.getParticipantConnectionChannelType() ==
GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) {
+ return new HelixGatewayServiceGrpcService(manager, config);
+ }
+ } else {
+ return new HelixGatewayServicePollModeChannel(config);
+ }
+ throw new IllegalArgumentException(
+ "Unsupported channel mode and type combination: " +
config.getChannelMode() + " , "
+ + config.getParticipantConnectionChannelType());
+ }
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index a176ca89c..ea867cacf 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -19,10 +19,15 @@ package org.apache.helix.gateway.channel;
* under the License.
*/
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.gateway.service.GatewayServiceEvent;
@@ -58,8 +63,13 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
// A fine grain lock register on instance level
private final PerKeyLockRegistry _lockRegistry;
- public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+ private final GatewayServiceChannelConfig _config;
+
+ private Server _server;
+
+ public HelixGatewayServiceGrpcService(GatewayServiceManager manager,
GatewayServiceChannelConfig config) {
_manager = manager;
+ _config = config;
_lockRegistry = new PerKeyLockRegistry();
}
@@ -176,4 +186,40 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
_reversedObserverMap.put(streamObserver, new
ImmutablePair<>(instanceName, clusterName));
});
}
+
+ @Override
+ public void start() throws IOException {
+ ServerBuilder serverBuilder =
ServerBuilder.forPort(_config.getGrpcServerPort())
+ .addService(this)
+ .keepAliveTime(_config.getServerHeartBeatInterval(),
+ TimeUnit.SECONDS) // HeartBeat time
+ .keepAliveTimeout(_config.getClientTimeout(),
+ TimeUnit.SECONDS) // KeepAlive client timeout
+ .permitKeepAliveTime(_config.getMaxAllowedClientHeartBeatInterval(),
+ TimeUnit.SECONDS) // Permit min HeartBeat time
+ .permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever
without active RPC
+ if (_config.getEnableReflectionService()) {
+ serverBuilder =
serverBuilder.addService(io.grpc.protobuf.services.ProtoReflectionService.newInstance());
+ }
+ _server = serverBuilder.build();
+
+ logger.info("Starting grpc server on port " + _config.getGrpcServerPort()
+ " now.... Server heart beat interval: "
+ + _config.getServerHeartBeatInterval() + " seconds, Max allowed client
heart beat interval: "
+ + _config.getMaxAllowedClientHeartBeatInterval() + " seconds, Client
timeout: " + _config.getClientTimeout()
+ + " seconds, Enable reflection service: " +
_config.getEnableReflectionService());
+ _server.start();
+ }
+
+ @Override
+ public void stop() {
+ if (_server != null) {
+ logger.info("Shutting down grpc server now....");
+ _server.shutdownNow();
+ }
+ }
+
+ @VisibleForTesting
+ Server getServer() {
+ return _server;
+ }
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
similarity index 51%
rename from
helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
rename to
helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 36c469825..30a9f1802 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.constant;
+package org.apache.helix.gateway.channel;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,8 +19,38 @@ package org.apache.helix.gateway.constant;
* under the License.
*/
-public enum GatewayServiceEventType {
- CONNECT, // init connection to gateway service
- UPDATE, // update state transition result
- DISCONNECT // shutdown connection to gateway service.
+import java.io.IOException;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.model.Message;
+
+// TODO: implement this class
+public class HelixGatewayServicePollModeChannel implements
HelixGatewayServiceChannel {
+
+ public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig
config) {
+ }
+
+ @Override
+ public void sendStateTransitionMessage(String instanceName, String
currentState, Message message) {
+
+ }
+
+ @Override
+ public void start() throws IOException {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void closeConnectionWithError(String instanceName, String reason) {
+
+ }
+
+ @Override
+ public void completeConnection(String instanceName) {
+
+ }
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
deleted file mode 100644
index 0d80a96d0..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.helix.gateway.channel;
-
-import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
-import org.apache.helix.model.Message;
-
-
-public class HelixGatewayServicePullModeChannel implements
HelixGatewayServiceChannel {
-
- public HelixGatewayServicePullModeChannel() {
- }
- @Override
- public void sendStateTransitionMessage(String instanceName, String
currentState, Message message) {
-
- }
-
- @Override
- public void closeConnectionWithError(String instanceName, String reason) {
-
- }
-
- @Override
- public void completeConnection(String instanceName) {
-
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
deleted file mode 100644
index 3f1bbafbc..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-public class GatewayServiceGrpcDefaultConfig {
- public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
- public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
- public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
index b919429b9..f5c66dea0 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -21,7 +21,7 @@ package org.apache.helix.gateway.service;
import java.util.List;
import java.util.Map;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
/**
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 2b626d395..289c5513c 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.service;
* under the License.
*/
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,9 +27,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.ImmutableSet;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
@@ -57,13 +59,16 @@ public class GatewayServiceManager {
// It is used to ensure for each instance, the connect/disconnect event
won't start until the previous one is done.
private final PerKeyBlockingExecutor _connectionEventProcessor;
- public GatewayServiceManager() {
+ private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
+
+ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
- _zkAddress = "foo";
+ _zkAddress = zkAddress;
_participantStateTransitionResultUpdator =
Executors.newSingleThreadExecutor();
- _gatewayServiceChannel = new HelixGatewayServiceGrpcService(this);
+ _gatewayServiceChannel =
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
this);
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); //
todo: make it configurable
+ _gatewayServiceChannelConfig = gatewayServiceChannelConfig;
}
/**
@@ -127,8 +132,16 @@ public class GatewayServiceManager {
}
}
- public HelixGatewayServiceChannel getHelixGatewayServiceProcessor() {
- return _gatewayServiceChannel;
+
+ public void startService() throws IOException {
+ _gatewayServiceChannel.start();
+ }
+
+ public void stopService() {
+ _gatewayServiceChannel.stop();
+ _connectionEventProcessor.shutdown();
+ _participantStateTransitionResultUpdator.shutdown();
+ _helixGatewayParticipantMap.clear();
}
private void createHelixGatewayParticipant(String clusterName, String
instanceName,
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
deleted file mode 100644
index afa1477ce..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.helix.gateway.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import io.grpc.BindableService;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.protobuf.services.ProtoReflectionService;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
-
-
-/**
- * Builder class to create a Helix gateway service server with custom
configurations.
- */
- public class HelixGatewayGrpcServerBuilder {
- private int port;
- private BindableService service;
- private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
- private int maxAllowedClientHeartBeatInterval =
DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
- private int clientTimeout = DEFAULT_CLIENT_TIMEOUT;
- private boolean enableReflectionService = true;
-
- public HelixGatewayGrpcServerBuilder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int
serverHeartBeatInterval) {
- this.serverHeartBeatInterval = serverHeartBeatInterval;
- return this;
- }
-
- public HelixGatewayGrpcServerBuilder
setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) {
- this.maxAllowedClientHeartBeatInterval =
maxAllowedClientHeartBeatInterval;
- return this;
- }
-
- public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) {
- this.clientTimeout = clientTimeout;
- return this;
- }
-
- public HelixGatewayGrpcServerBuilder setGrpcService(BindableService
service) {
- this.service = service;
- return this;
- }
-
- public HelixGatewayGrpcServerBuilder enableReflectionService(boolean
enableReflectionService) {
- this.enableReflectionService = enableReflectionService;
- return this;
- }
-
- public Server build() {
- validate();
-
- ServerBuilder serverBuilder = ServerBuilder.forPort(port)
- .addService(service)
- .keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS) //
HeartBeat time
- .keepAliveTimeout(clientTimeout, TimeUnit.SECONDS) // KeepAlive
client timeout
- .permitKeepAliveTime(maxAllowedClientHeartBeatInterval,
TimeUnit.SECONDS) // Permit min HeartBeat time
- .permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever
without active RPCs
-
- if (enableReflectionService) {
- serverBuilder.addService(ProtoReflectionService.newInstance());
- }
-
- return serverBuilder
- .build();
- }
-
- private void validate() {
- if (port == 0 || service == null) {
- throw new IllegalArgumentException("Port and service must be set");
- }
- if (clientTimeout < maxAllowedClientHeartBeatInterval) {
- throw new IllegalArgumentException("Client timeout is less than max
allowed client heartbeat interval");
- }
- }
- }
-
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index ecc6c9568..8f45407c0 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDefinedState;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.model.Message;
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
index 15b95448f..2fd482d7d 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -19,7 +19,6 @@ package org.apache.helix.gateway.base.util;
* under the License.
*/
-import io.grpc.Server;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -44,9 +43,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -798,10 +794,4 @@ public class TestHelper {
Thread.sleep(50);
} while (true);
}
-
- public static Server createHelixGatewayServer(int port,
GatewayServiceManager manager) {
- return new HelixGatewayGrpcServerBuilder().setPort(port)
- .setGrpcService((HelixGatewayServiceGrpcService)
manager.getHelixGatewayServiceProcessor())
- .build();
- }
}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
new file mode 100644
index 000000000..30f6c9692
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
@@ -0,0 +1,73 @@
+package org.apache.helix.gateway.chanel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GatewayServiceChannelConfigTest {
+
+ @Test
+ public void testGatewayServiceChannelConfigBuilder() {
+ GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+ new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+
+ builder.setChannelMode(GatewayServiceChannelConfig.ChannelMode.PUSH_MODE)
+
.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER)
+
.setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER)
+ .setGrpcServerPort(50051)
+ .setServerHeartBeatInterval(30)
+ .setMaxAllowedClientHeartBeatInterval(60)
+ .setClientTimeout(120)
+ .setEnableReflectionService(true)
+ .setPollIntervalSec(10);
+
+ GatewayServiceChannelConfig config = builder.build();
+
+ Assert.assertEquals(config.getChannelMode(),
GatewayServiceChannelConfig.ChannelMode.PUSH_MODE);
+ Assert.assertEquals(config.getParticipantConnectionChannelType(),
+ GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+ Assert.assertEquals(config.getShardStateChannelType(),
GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+ Assert.assertEquals(config.getGrpcServerPort(), 50051);
+ Assert.assertEquals(config.getServerHeartBeatInterval(), 30);
+ Assert.assertEquals(config.getMaxAllowedClientHeartBeatInterval(), 60);
+ Assert.assertEquals(config.getClientTimeout(), 120);
+ Assert.assertTrue(config.getEnableReflectionService());
+ Assert.assertEquals(config.getPollIntervalSec(), 10);
+ }
+
+ @Test
+ public void testInvalidConfig() {
+ GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+ new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+
+
builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER);
+
+ // assert er get an exception
+ try {
+ builder.build();
+ Assert.fail("Should have thrown an exception");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index d3180a16c..2c593ca12 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.participant;
* under the License.
*/
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -356,6 +357,16 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
_pendingMessageMap.put(instanceName, message);
}
+ @Override
+ public void start() throws IOException {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
@Override
public void closeConnectionWithError(String instanceName, String reason) {
_errorDisconnectCount.incrementAndGet();
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index 627c572e9..19c92f5e2 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -21,15 +21,13 @@ package org.apache.helix.gateway.service;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.Server;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.gateway.base.HelixGatewayTestBase;
-import org.apache.helix.gateway.base.util.TestHelper;
-import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.testng.Assert;
import org.testng.annotations.Test;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
@@ -44,8 +42,10 @@ public class TestGatewayServiceConnection extends
HelixGatewayTestBase {
@Test
public void TestLivenessDetection() throws IOException, InterruptedException
{
// start the gateway service
- Server server = TestHelper.createHelixGatewayServer(50051, new
DummyGatewayServiceManager());
- server.start();
+ GatewayServiceChannelConfig config =
+ new
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build();
+ GatewayServiceManager mng = new DummyGatewayServiceManager(config);
+ mng.startService();
// start the client
HelixGatewayClient client = new HelixGatewayClient("localhost", 50051);
@@ -69,6 +69,8 @@ public class TestGatewayServiceConnection extends
HelixGatewayTestBase {
// assert we have disconnect event when shutting down ungracefully
client2.shutdownByClosingConnection();
Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS));
+
+ mng.stopService();
}
public class HelixGatewayClient {
@@ -133,8 +135,8 @@ public class TestGatewayServiceConnection extends
HelixGatewayTestBase {
class DummyGatewayServiceManager extends GatewayServiceManager {
- public DummyGatewayServiceManager() {
- super();
+ public DummyGatewayServiceManager(GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
+ super("dummyZkAddress", gatewayServiceChannelConfig);
}
@Override
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 873841cae..41604b427 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -1,5 +1,6 @@
package org.apache.helix.gateway.service;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
import org.testng.annotations.Test;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
@@ -16,7 +17,8 @@ public class TestGatewayServiceManager {
public void testConnectionAndDisconnectionEvents() {
manager = mock(GatewayServiceManager.class);
- HelixGatewayServiceGrpcService grpcService = new
HelixGatewayServiceGrpcService(manager);
+ GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
new
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051);
+ HelixGatewayServiceGrpcService grpcService = new
HelixGatewayServiceGrpcService(manager,builder.build());
// Mock a connection event
HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent =
HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
@@ -42,5 +44,7 @@ public class TestGatewayServiceManager {
grpcService.report(null).onNext(disconnectionEvent);
// Verify the events were processed in sequence
verify(manager, times(2)).onGatewayServiceEvent(any());
+
+ grpcService.stop();
}
}