This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit c5eaa8ab795ac056340b1ac22ed6235f0d101120 Author: xyuanlu <[email protected]> AuthorDate: Thu Aug 22 23:14:10 2024 -0700 Create Gateway service channel factory (#2883) This PR introduces a new configuration system for the Helix Gateway service, allowing for more flexible and customizable channel configurations. The main changes include: -Introduction of GatewayServiceChannelConfig class to manage various channel configurations. -Implementation of a factory pattern (HelixGatewayServiceChannelFactory) for creating appropriate service channels based on the configuration. -The PR also includes various improvements in error handling, logging, and code organization. --- .../org/apache/helix/gateway/HelixGatewayMain.java | 27 ++- .../constant/GatewayServiceConfigConstant.java} | 11 +- .../constant/GatewayServiceEventType.java | 2 +- .../api/service/HelixGatewayServiceChannel.java | 14 ++ .../channel/GatewayServiceChannelConfig.java | 206 +++++++++++++++++++++ .../channel/HelixGatewayServiceChannelFactory.java | 43 +++++ .../channel/HelixGatewayServiceGrpcService.java | 48 ++++- .../HelixGatewayServicePollModeChannel.java} | 40 +++- .../HelixGatewayServicePullModeChannel.java | 25 --- .../constant/GatewayServiceGrpcDefaultConfig.java | 7 - .../helix/gateway/service/GatewayServiceEvent.java | 2 +- .../gateway/service/GatewayServiceManager.java | 27 ++- .../util/HelixGatewayGrpcServerBuilder.java | 99 ---------- .../util/StateTransitionMessageTranslateUtil.java | 2 +- .../apache/helix/gateway/base/util/TestHelper.java | 10 - .../chanel/GatewayServiceChannelConfigTest.java | 73 ++++++++ .../participant/TestHelixGatewayParticipant.java | 11 ++ .../service/TestGatewayServiceConnection.java | 18 +- .../gateway/service/TestGatewayServiceManager.java | 6 +- 19 files changed, 485 insertions(+), 186 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java index b4b8ca0cb..df6230ca6 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java @@ -19,35 +19,32 @@ package org.apache.helix.gateway; * under the License. */ -import io.grpc.Server; import java.io.IOException; -import java.util.concurrent.TimeUnit; -import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; + +import static java.lang.Integer.*; /** * Main class for Helix Gateway. * It starts the Helix Gateway grpc service. + * args0: zk address + * args1: helix gateway groc server port */ public final class HelixGatewayMain { private HelixGatewayMain() { } - public static void main(String[] args) throws InterruptedException, IOException { - // Create a new server to listen on port 50051 - GatewayServiceManager manager = new GatewayServiceManager(); - Server server = new HelixGatewayGrpcServerBuilder().setPort(50051) - .setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor()) - .build(); - - server.start(); - System.out.println("Server started, listening on " + server.getPort()); + public static void main(String[] args) throws IOException { + // Create a new server + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder(); + GatewayServiceManager manager = + new GatewayServiceManager(args[0], builder.setGrpcServerPort(parseInt(args[1])).build()); - // Wait for the server to shutdown - server.awaitTermination(365, TimeUnit.DAYS); + manager.startService(); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java similarity index 69% copy from helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java copy to helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java index 36c469825..70e4d73e3 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.constant; +package org.apache.helix.gateway.api.constant; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,8 +19,9 @@ package org.apache.helix.gateway.constant; * under the License. */ -public enum GatewayServiceEventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. +public class GatewayServiceConfigConstant { + public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60; + public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60; + public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60; + public static final int DEFAULT_POLL_INTERVAL_SEC = 60; } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java similarity index 95% copy from helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java copy to helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java index 36c469825..b5cff9978 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceEventType.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.constant; +package org.apache.helix.gateway.api.constant; /* * Licensed to the Apache Software Foundation (ASF) under one diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java index 1731e158f..02c0e8a8d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.api.service; * under the License. */ +import java.io.IOException; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.model.Message; @@ -54,6 +55,19 @@ public interface HelixGatewayServiceChannel { gatewayServiceManager.onGatewayServiceEvent(event); } + /** + * Start the gateway service channel. + * + * @throws IOException if the channel cannot be started + */ + public void start() throws IOException; + + /** + * Stop the gateway service channel forcefully. + */ + public void stop(); + + // TODO: remove the following 2 apis in future changes /** * Gateway service close connection with error. This function is called when manager wants to close client diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java new file mode 100644 index 000000000..31c7af05b --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java @@ -0,0 +1,206 @@ +package org.apache.helix.gateway.channel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import static org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*; + + +public class GatewayServiceChannelConfig { + // Mode to get helix participant information (inbound information). This included health check and shard state transition response + // We do not support hybrid mode as of now, (i.e. have push mode for participant liveness detection and pull mode for shard state) + public enum ChannelMode { + PUSH_MODE, // The gateway service passively receives participant information + POLL_MODE // The gateway service actively polls participant information + } + + // NOTE: + // For outbound information - stateTransition request, Gateway service will always push the state transition message. + // We do not support participant poll mode for stateTransition request as of now. + + // channel type for the following 3 information - participant liveness detection, shard state transition request and response + // By default, they are all grpc server, user could define them separately. + public enum ChannelType { + GRPC_SERVER, + GRPC_CLIENT, + FILE + } + + // service configs + + // service mode for inbound information. + private ChannelMode _channelMode; + // channel type for participant liveness detection + private ChannelType _participantConnectionChannelType; + // channel for sending and receiving shard state transition request and shard state response + private ChannelType _shardStateChannelType; + + // grpc server configs + private final int _grpcServerPort; + private final int _serverHeartBeatInterval; + private final int _maxAllowedClientHeartBeatInterval; + private final int _clientTimeout; + private final boolean _enableReflectionService; + + // poll mode config + private final int _pollIntervalSec; + // TODO: configs for pull mode grpc client + + // TODO: configs for pull mode with file + + // getters + + public ChannelMode getChannelMode() { + return _channelMode; + } + public ChannelType getParticipantConnectionChannelType() { + return _participantConnectionChannelType; + } + + public ChannelType getShardStateChannelType() { + return _shardStateChannelType; + } + + public int getGrpcServerPort() { + return _grpcServerPort; + } + + public int getServerHeartBeatInterval() { + return _serverHeartBeatInterval; + } + + public int getMaxAllowedClientHeartBeatInterval() { + return _maxAllowedClientHeartBeatInterval; + } + + public int getClientTimeout() { + return _clientTimeout; + } + + public boolean getEnableReflectionService() { + return _enableReflectionService; + } + + public int getPollIntervalSec() { + return _pollIntervalSec; + } + + private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, ChannelType participantConnectionChannelType, + ChannelType shardStateChannelType, int serverHeartBeatInterval, int maxAllowedClientHeartBeatInterval, + int clientTimeout, boolean enableReflectionService, int pollIntervalSec) { + _grpcServerPort = grpcServerPort; + _channelMode = channelMode; + _participantConnectionChannelType = participantConnectionChannelType; + _shardStateChannelType = shardStateChannelType; + _serverHeartBeatInterval = serverHeartBeatInterval; + _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval; + _clientTimeout = clientTimeout; + _enableReflectionService = enableReflectionService; + _pollIntervalSec = pollIntervalSec; + } + + public static class GatewayServiceProcessorConfigBuilder { + + // service configs + private ChannelMode _channelMode = ChannelMode.PUSH_MODE; + private ChannelType _participantConnectionChannelType = ChannelType.GRPC_SERVER; + private ChannelType _shardStatenChannelType = ChannelType.GRPC_SERVER; + + // grpc server configs + private int _grpcServerPort; + private int _serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL; + private int _maxAllowedClientHeartBeatInterval = DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL; + private int _clientTimeout = DEFAULT_CLIENT_TIMEOUT; + private boolean _enableReflectionService = true; + + // poll mode config + private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC; + // poll mode grpc client configs + + // poll mode file configs + + + public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode channelMode) { + _channelMode = channelMode; + return this; + } + + public GatewayServiceProcessorConfigBuilder setParticipantConnectionChannelType(ChannelType channelMode) { + _participantConnectionChannelType = channelMode; + return this; + } + + public GatewayServiceProcessorConfigBuilder setShardStateProcessorType(ChannelType channelMode) { + _shardStatenChannelType = channelMode; + return this; + } + + public GatewayServiceProcessorConfigBuilder setGrpcServerPort(int grpcServerPort) { + _grpcServerPort = grpcServerPort; + return this; + } + + public GatewayServiceProcessorConfigBuilder setServerHeartBeatInterval(int serverHeartBeatInterval) { + _serverHeartBeatInterval = serverHeartBeatInterval; + return this; + } + + public GatewayServiceProcessorConfigBuilder setMaxAllowedClientHeartBeatInterval( + int maxAllowedClientHeartBeatInterval) { + _maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval; + return this; + } + + public GatewayServiceProcessorConfigBuilder setClientTimeout(int clientTimeout) { + _clientTimeout = clientTimeout; + return this; + } + + public GatewayServiceProcessorConfigBuilder setEnableReflectionService(boolean enableReflectionService) { + _enableReflectionService = enableReflectionService; + return this; + } + + public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int pollIntervalSec) { + _pollIntervalSec = pollIntervalSec; + return this; + } + + public void validate() { + if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER + && _shardStatenChannelType != ChannelType.GRPC_SERVER) || ( + _participantConnectionChannelType != ChannelType.GRPC_SERVER + && _shardStatenChannelType == ChannelType.GRPC_SERVER)) { + throw new IllegalArgumentException( + "In caas of GRPC server, Participant connection channel type and shard state channel type must be the same"); + } + if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && _grpcServerPort == 0) { + throw new IllegalArgumentException("Grpc server port must be set for grpc server channel type"); + } + } + + public GatewayServiceChannelConfig build() { + validate(); + return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, _participantConnectionChannelType, + _shardStatenChannelType, _serverHeartBeatInterval, _maxAllowedClientHeartBeatInterval, _clientTimeout, + _enableReflectionService, _pollIntervalSec); + } + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java new file mode 100644 index 000000000..4c20b1977 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java @@ -0,0 +1,43 @@ +package org.apache.helix.gateway.channel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; +import org.apache.helix.gateway.service.GatewayServiceManager; + + +public class HelixGatewayServiceChannelFactory { + + public static HelixGatewayServiceChannel createServiceChannel(GatewayServiceChannelConfig config, + GatewayServiceManager manager) { + + if (config.getChannelMode() == GatewayServiceChannelConfig.ChannelMode.PUSH_MODE) { + if (config.getParticipantConnectionChannelType() == GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) { + return new HelixGatewayServiceGrpcService(manager, config); + } + } else { + return new HelixGatewayServicePollModeChannel(config); + } + throw new IllegalArgumentException( + "Unsupported channel mode and type combination: " + config.getChannelMode() + " , " + + config.getParticipantConnectionChannelType()); + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index a176ca89c..ea867cacf 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -19,10 +19,15 @@ package org.apache.helix.gateway.channel; * under the License. */ +import com.google.common.annotations.VisibleForTesting; +import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.gateway.service.GatewayServiceEvent; @@ -58,8 +63,13 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli // A fine grain lock register on instance level private final PerKeyLockRegistry _lockRegistry; - public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { + private final GatewayServiceChannelConfig _config; + + private Server _server; + + public HelixGatewayServiceGrpcService(GatewayServiceManager manager, GatewayServiceChannelConfig config) { _manager = manager; + _config = config; _lockRegistry = new PerKeyLockRegistry(); } @@ -176,4 +186,40 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli _reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName)); }); } + + @Override + public void start() throws IOException { + ServerBuilder serverBuilder = ServerBuilder.forPort(_config.getGrpcServerPort()) + .addService(this) + .keepAliveTime(_config.getServerHeartBeatInterval(), + TimeUnit.SECONDS) // HeartBeat time + .keepAliveTimeout(_config.getClientTimeout(), + TimeUnit.SECONDS) // KeepAlive client timeout + .permitKeepAliveTime(_config.getMaxAllowedClientHeartBeatInterval(), + TimeUnit.SECONDS) // Permit min HeartBeat time + .permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever without active RPC + if (_config.getEnableReflectionService()) { + serverBuilder = serverBuilder.addService(io.grpc.protobuf.services.ProtoReflectionService.newInstance()); + } + _server = serverBuilder.build(); + + logger.info("Starting grpc server on port " + _config.getGrpcServerPort() + " now.... Server heart beat interval: " + + _config.getServerHeartBeatInterval() + " seconds, Max allowed client heart beat interval: " + + _config.getMaxAllowedClientHeartBeatInterval() + " seconds, Client timeout: " + _config.getClientTimeout() + + " seconds, Enable reflection service: " + _config.getEnableReflectionService()); + _server.start(); + } + + @Override + public void stop() { + if (_server != null) { + logger.info("Shutting down grpc server now...."); + _server.shutdownNow(); + } + } + + @VisibleForTesting + Server getServer() { + return _server; + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java similarity index 51% rename from helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java index 36c469825..30a9f1802 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.constant; +package org.apache.helix.gateway.channel; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,8 +19,38 @@ package org.apache.helix.gateway.constant; * under the License. */ -public enum GatewayServiceEventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. +import java.io.IOException; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; +import org.apache.helix.model.Message; + +// TODO: implement this class +public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceChannel { + + public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig config) { + } + + @Override + public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { + + } + + @Override + public void start() throws IOException { + + } + + @Override + public void stop() { + + } + + @Override + public void closeConnectionWithError(String instanceName, String reason) { + + } + + @Override + public void completeConnection(String instanceName) { + + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java deleted file mode 100644 index 0d80a96d0..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.apache.helix.gateway.channel; - -import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; -import org.apache.helix.model.Message; - - -public class HelixGatewayServicePullModeChannel implements HelixGatewayServiceChannel { - - public HelixGatewayServicePullModeChannel() { - } - @Override - public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { - - } - - @Override - public void closeConnectionWithError(String instanceName, String reason) { - - } - - @Override - public void completeConnection(String instanceName) { - - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java deleted file mode 100644 index 3f1bbafbc..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.helix.gateway.constant; - -public class GatewayServiceGrpcDefaultConfig { - public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60; - public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60; - public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60; -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java index b919429b9..f5c66dea0 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java @@ -21,7 +21,7 @@ package org.apache.helix.gateway.service; import java.util.List; import java.util.Map; -import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.api.constant.GatewayServiceEventType; /** diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 2b626d395..289c5513c 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.service; * under the License. */ +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,9 +27,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.collect.ImmutableSet; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; +import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; -import org.apache.helix.gateway.constant.GatewayServiceEventType; -import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -57,13 +59,16 @@ public class GatewayServiceManager { // It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done. private final PerKeyBlockingExecutor _connectionEventProcessor; - public GatewayServiceManager() { + private final GatewayServiceChannelConfig _gatewayServiceChannelConfig; + + public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) { _helixGatewayParticipantMap = new ConcurrentHashMap<>(); - _zkAddress = "foo"; + _zkAddress = zkAddress; _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); - _gatewayServiceChannel = new HelixGatewayServiceGrpcService(this); + _gatewayServiceChannel = HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, this); _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable + _gatewayServiceChannelConfig = gatewayServiceChannelConfig; } /** @@ -127,8 +132,16 @@ public class GatewayServiceManager { } } - public HelixGatewayServiceChannel getHelixGatewayServiceProcessor() { - return _gatewayServiceChannel; + + public void startService() throws IOException { + _gatewayServiceChannel.start(); + } + + public void stopService() { + _gatewayServiceChannel.stop(); + _connectionEventProcessor.shutdown(); + _participantStateTransitionResultUpdator.shutdown(); + _helixGatewayParticipantMap.clear(); } private void createHelixGatewayParticipant(String clusterName, String instanceName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java deleted file mode 100644 index afa1477ce..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.helix.gateway.util; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import io.grpc.BindableService; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.protobuf.services.ProtoReflectionService; -import java.util.concurrent.TimeUnit; - -import static org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*; - - -/** - * Builder class to create a Helix gateway service server with custom configurations. - */ - public class HelixGatewayGrpcServerBuilder { - private int port; - private BindableService service; - private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL; - private int maxAllowedClientHeartBeatInterval = DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL; - private int clientTimeout = DEFAULT_CLIENT_TIMEOUT; - private boolean enableReflectionService = true; - - public HelixGatewayGrpcServerBuilder setPort(int port) { - this.port = port; - return this; - } - - public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int serverHeartBeatInterval) { - this.serverHeartBeatInterval = serverHeartBeatInterval; - return this; - } - - public HelixGatewayGrpcServerBuilder setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) { - this.maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval; - return this; - } - - public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) { - this.clientTimeout = clientTimeout; - return this; - } - - public HelixGatewayGrpcServerBuilder setGrpcService(BindableService service) { - this.service = service; - return this; - } - - public HelixGatewayGrpcServerBuilder enableReflectionService(boolean enableReflectionService) { - this.enableReflectionService = enableReflectionService; - return this; - } - - public Server build() { - validate(); - - ServerBuilder serverBuilder = ServerBuilder.forPort(port) - .addService(service) - .keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS) // HeartBeat time - .keepAliveTimeout(clientTimeout, TimeUnit.SECONDS) // KeepAlive client timeout - .permitKeepAliveTime(maxAllowedClientHeartBeatInterval, TimeUnit.SECONDS) // Permit min HeartBeat time - .permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever without active RPCs - - if (enableReflectionService) { - serverBuilder.addService(ProtoReflectionService.newInstance()); - } - - return serverBuilder - .build(); - } - - private void validate() { - if (port == 0 || service == null) { - throw new IllegalArgumentException("Port and service must be set"); - } - if (clientTimeout < maxAllowedClientHeartBeatInterval) { - throw new IllegalArgumentException("Client timeout is less than max allowed client heartbeat interval"); - } - } - } - diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index ecc6c9568..8f45407c0 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.helix.HelixDefinedState; -import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.model.Message; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java index 15b95448f..2fd482d7d 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java @@ -19,7 +19,6 @@ package org.apache.helix.gateway.base.util; * under the License. */ -import io.grpc.Server; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; @@ -44,9 +43,6 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; -import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; @@ -798,10 +794,4 @@ public class TestHelper { Thread.sleep(50); } while (true); } - - public static Server createHelixGatewayServer(int port, GatewayServiceManager manager) { - return new HelixGatewayGrpcServerBuilder().setPort(port) - .setGrpcService((HelixGatewayServiceGrpcService) manager.getHelixGatewayServiceProcessor()) - .build(); - } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java new file mode 100644 index 000000000..30f6c9692 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java @@ -0,0 +1,73 @@ +package org.apache.helix.gateway.chanel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class GatewayServiceChannelConfigTest { + + @Test + public void testGatewayServiceChannelConfigBuilder() { + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder(); + + builder.setChannelMode(GatewayServiceChannelConfig.ChannelMode.PUSH_MODE) + .setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) + .setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER) + .setGrpcServerPort(50051) + .setServerHeartBeatInterval(30) + .setMaxAllowedClientHeartBeatInterval(60) + .setClientTimeout(120) + .setEnableReflectionService(true) + .setPollIntervalSec(10); + + GatewayServiceChannelConfig config = builder.build(); + + Assert.assertEquals(config.getChannelMode(), GatewayServiceChannelConfig.ChannelMode.PUSH_MODE); + Assert.assertEquals(config.getParticipantConnectionChannelType(), + GatewayServiceChannelConfig.ChannelType.GRPC_SERVER); + Assert.assertEquals(config.getShardStateChannelType(), GatewayServiceChannelConfig.ChannelType.GRPC_SERVER); + Assert.assertEquals(config.getGrpcServerPort(), 50051); + Assert.assertEquals(config.getServerHeartBeatInterval(), 30); + Assert.assertEquals(config.getMaxAllowedClientHeartBeatInterval(), 60); + Assert.assertEquals(config.getClientTimeout(), 120); + Assert.assertTrue(config.getEnableReflectionService()); + Assert.assertEquals(config.getPollIntervalSec(), 10); + } + + @Test + public void testInvalidConfig() { + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder(); + + builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.GRPC_SERVER); + + // assert er get an exception + try { + builder.build(); + Assert.fail("Should have thrown an exception"); + } catch (IllegalArgumentException e) { + // expected + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index d3180a16c..2c593ca12 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.participant; * under the License. */ +import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -356,6 +357,16 @@ public class TestHelixGatewayParticipant extends ZkTestBase { _pendingMessageMap.put(instanceName, message); } + @Override + public void start() throws IOException { + + } + + @Override + public void stop() { + + } + @Override public void closeConnectionWithError(String instanceName, String reason) { _errorDisconnectCount.incrementAndGet(); diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java index 627c572e9..19c92f5e2 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java @@ -21,15 +21,13 @@ package org.apache.helix.gateway.service; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Server; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.helix.gateway.base.HelixGatewayTestBase; -import org.apache.helix.gateway.base.util.TestHelper; -import org.apache.helix.gateway.constant.GatewayServiceEventType; -import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.testng.Assert; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; @@ -44,8 +42,10 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { @Test public void TestLivenessDetection() throws IOException, InterruptedException { // start the gateway service - Server server = TestHelper.createHelixGatewayServer(50051, new DummyGatewayServiceManager()); - server.start(); + GatewayServiceChannelConfig config = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build(); + GatewayServiceManager mng = new DummyGatewayServiceManager(config); + mng.startService(); // start the client HelixGatewayClient client = new HelixGatewayClient("localhost", 50051); @@ -69,6 +69,8 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { // assert we have disconnect event when shutting down ungracefully client2.shutdownByClosingConnection(); Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS)); + + mng.stopService(); } public class HelixGatewayClient { @@ -133,8 +135,8 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { class DummyGatewayServiceManager extends GatewayServiceManager { - public DummyGatewayServiceManager() { - super(); + public DummyGatewayServiceManager(GatewayServiceChannelConfig gatewayServiceChannelConfig) { + super("dummyZkAddress", gatewayServiceChannelConfig); } @Override diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java index 873841cae..41604b427 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -1,5 +1,6 @@ package org.apache.helix.gateway.service; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; @@ -16,7 +17,8 @@ public class TestGatewayServiceManager { public void testConnectionAndDisconnectionEvents() { manager = mock(GatewayServiceManager.class); - HelixGatewayServiceGrpcService grpcService = new HelixGatewayServiceGrpcService(manager); + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051); + HelixGatewayServiceGrpcService grpcService = new HelixGatewayServiceGrpcService(manager,builder.build()); // Mock a connection event HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent = HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder() @@ -42,5 +44,7 @@ public class TestGatewayServiceManager { grpcService.report(null).onNext(disconnectionEvent); // Verify the events were processed in sequence verify(manager, times(2)).onGatewayServiceEvent(any()); + + grpcService.stop(); } }
