This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 00db56642fe735874856b3a15263e2c919a884d2 Author: xyuanlu <[email protected]> AuthorDate: Thu Jul 25 15:00:12 2024 -0700 Implement GatewayServiceManager (#2844) Implement GatewayServiceManager --- helix-gateway/pom.xml | 5 + .../gateway/constant/GatewayServiceEventType.java | 7 ++ .../HelixGatewayServiceGrpcService.java | 118 +++++++++++++++++++++ .../grpcservice/HelixGatewayServiceService.java | 67 ------------ .../helix/gateway/service/GatewayServiceEvent.java | 106 ++++++++++++++++++ .../gateway/service/GatewayServiceManager.java | 114 +++++++++++++++----- .../helix/gateway/service/HelixGatewayService.java | 32 +++--- .../service/HelixGatewayServiceProcessor.java | 3 +- .../helix/gateway/service/ReplicaStateTracker.java | 12 --- ...HelixGatewayOnlineOfflineStateModelFactory.java | 1 - .../helix/gateway/util/PerKeyBlockingExecutor.java | 4 +- .../util/StateTransitionMessageTranslateUtil.java | 61 ++++++++++- .../src/main/proto/HelixGatewayService.proto | 36 ++++--- .../gateway/service/TestGatewayServiceManager.java | 47 ++++++++ 14 files changed, 468 insertions(+), 145 deletions(-) diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml index f788f5bcf..8a7c50ed7 100644 --- a/helix-gateway/pom.xml +++ b/helix-gateway/pom.xml @@ -129,6 +129,11 @@ <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <extensions> diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java new file mode 100644 index 000000000..5ae32e40f --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java @@ -0,0 +1,7 @@ +package org.apache.helix.gateway.constant; + +public enum GatewayServiceEventType { + CONNECT, // init connection to gateway service + UPDATE, // update state transition result + DISCONNECT // shutdown connection to gateway service. +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java new file mode 100644 index 000000000..c2f9b57e3 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java @@ -0,0 +1,118 @@ +package org.apache.helix.gateway.grpcservice; + +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.util.PerKeyLockRegistry; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; + + +/** + * Helix Gateway Service GRPC UI implementation. + */ +public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase + implements HelixGatewayServiceProcessor { + + // Map to store the observer for each instance + private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); + // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. + private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); + + private final GatewayServiceManager _manager; + + // A fine grain lock register on instance level + private final PerKeyLockRegistry _lockRegistry; + + public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { + _manager = manager; + _lockRegistry = new PerKeyLockRegistry(); + } + + /** + * Grpc service end pint. + * Application instances Report the state of the shard or result of transition request to the gateway service. + * @param responseObserver + * @return + */ + @Override + public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + + return new StreamObserver<ShardStateMessage>() { + + @Override + public void onNext(ShardStateMessage request) { + if (request.hasShardState()) { + ShardState shardState = request.getShardState(); + updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); + } + _manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); + } + + @Override + public void onError(Throwable t) { + onClientClose(responseObserver); + } + + @Override + public void onCompleted() { + onClientClose(responseObserver); + } + }; + } + + /** + * Send state transition message to the instance. + * The instance must already have established a connection to the gateway service. + * @param instanceName + * @return + */ + @Override + public boolean sendStateTransitionMessage(String instanceName) { + StreamObserver<TransitionMessage> observer; + observer = _observerMap.get(instanceName); + if (observer != null) { + observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage()); + } + return true; + } + + private void updateObserver(String instanceName, String clusterName, + StreamObserver<TransitionMessage> streamObserver) { + _lockRegistry.withLock(instanceName, () -> { + _observerMap.put(instanceName, streamObserver); + _reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName)); + }); + } + + private void onClientClose( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + String instanceName; + String clusterName; + Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); + clusterName = instanceInfo.getRight(); + instanceName = instanceInfo.getLeft(); + + if (instanceName == null || clusterName == null) { + // TODO: log error; + return; + } + GatewayServiceEvent event = + StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName); + _manager.newGatewayServiceEvent(event); + _lockRegistry.withLock(instanceName, () -> { + _reversedObserverMap.remove(responseObserver); + _observerMap.remove(instanceName); + _lockRegistry.removeLock(instanceName); + }); + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java deleted file mode 100644 index 237f1f272..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.helix.gateway.grpcservice; - -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; -import proto.org.apache.helix.gateway.*; -import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; - -import java.util.Map; - - -/** - * Helix Gateway Service GRPC UI implementation. - */ -public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase - implements HelixGatewayServiceProcessor { - - Map<String, StreamObserver<TransitionMessage>> _observerMap = - new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>(); - - @Override - public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( - StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { - - return new StreamObserver<ShardStateMessage>() { - - @Override - public void onNext(ShardStateMessage request) { - // called when a client sends a message - //.... - String instanceName = request.getInstanceName(); - if (!_observerMap.containsValue(instanceName)) { - // update state map - updateObserver(instanceName, responseObserver); - } - // process the message - } - - @Override - public void onError(Throwable t) { - // called when a client sends an error - //.... - } - - @Override - public void onCompleted() { - // called when the client completes - //.... - } - }; - } - - @Override - public boolean sendStateTransitionMessage(String instanceName) { - return false; - } - - @Override - public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event) { - - } - - public void updateObserver(String instanceName, StreamObserver<TransitionMessage> streamObserver) { - _observerMap.put(instanceName, streamObserver); - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java new file mode 100644 index 000000000..68c21bda1 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java @@ -0,0 +1,106 @@ +package org.apache.helix.gateway.service; + +import java.util.List; +import java.util.Map; +import org.apache.helix.gateway.constant.GatewayServiceEventType; + + +/** + * Event representing message reported by clients to Helix Gateway Service. + */ +public class GatewayServiceEvent { + // event type + private GatewayServiceEventType _eventType; + // event data + private String _clusterName; + private String _instanceName; + // A map where client reports the state of each shard upon connection + private Map<String, Map<String, String>> _shardStateMap; + // result for state transition request + private List<StateTransitionResult> _stateTransitionResult; + + public static class StateTransitionResult { + private String stateTransitionId; + private String stateTransitionStatus; + private String shardState; + + public StateTransitionResult(String stateTransitionId, String stateTransitionStatus, String shardState) { + this.stateTransitionId = stateTransitionId; + this.stateTransitionStatus = stateTransitionStatus; + this.shardState = shardState; + } + + public String getStateTransitionId() { + return stateTransitionId; + } + public String getStateTransitionStatus() { + return stateTransitionStatus; + } + public String getShardState() { + return shardState; + } + } + + private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName, + Map<String, Map<String, String>> shardStateMap, List<StateTransitionResult> stateTransitionStatusMap) { + _eventType = eventType; + _clusterName = clusterName; + _instanceName = instanceName; + _shardStateMap = shardStateMap; + _stateTransitionResult = stateTransitionStatusMap; + } + + public GatewayServiceEventType getEventType() { + return _eventType; + } + public String getClusterName() { + return _clusterName; + } + public String getInstanceName() { + return _instanceName; + } + public Map<String, Map<String, String>> getShardStateMap() { + return _shardStateMap; + } + public List<StateTransitionResult> getStateTransitionResult() { + return _stateTransitionResult; + } + + + public static class GateWayServiceEventBuilder { + private GatewayServiceEventType _eventType; + private String _clusterName; + private String _instanceName; + private Map<String, Map<String, String>> _shardStateMap; + private List<StateTransitionResult> _stateTransitionResult; + + public GateWayServiceEventBuilder(GatewayServiceEventType eventType) { + this._eventType = eventType; + } + + public GateWayServiceEventBuilder setClusterName(String clusterName) { + this._clusterName = clusterName; + return this; + } + + public GateWayServiceEventBuilder setParticipantName(String instanceName) { + this._instanceName = instanceName; + return this; + } + + public GateWayServiceEventBuilder setShardStateMap(Map<String, Map<String, String>> shardStateMap) { + this._shardStateMap = shardStateMap; + return this; + } + + public GateWayServiceEventBuilder setStateTransitionStatusMap( + List<StateTransitionResult> stateTransitionStatusMap) { + this._stateTransitionResult = stateTransitionStatusMap; + return this; + } + + public GatewayServiceEvent build() { + return new GatewayServiceEvent(_eventType, _clusterName, _instanceName, _shardStateMap, _stateTransitionResult); + } + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index ee803b9cc..889c76c10 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -1,10 +1,14 @@ package org.apache.helix.gateway.service; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; - import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.util.PerKeyBlockingExecutor; /** @@ -16,50 +20,102 @@ import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService; */ public class GatewayServiceManager { + public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10; + private final Map<String, HelixGatewayService> _helixGatewayServiceMap; - HelixGatewayServiceService _helixGatewayServiceService; - - HelixGatewayServiceProcessor _helixGatewayServiceProcessor; - - Map<String, HelixGatewayService> _helixGatewayServiceMap; + // a single thread tp for event processing + private final ExecutorService _participantStateTransitionResultUpdator; - // TODO: add thread pool for init - // single thread tp for update + // link to grpc service + private final HelixGatewayServiceGrpcService _grpcService; - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } - - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; - - // todo: add more fields - } + // a per key executor for connection event. All event for the same instance will be executed in sequence. + // It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done. + private final PerKeyBlockingExecutor _connectionEventProcessor; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _grpcService = new HelixGatewayServiceGrpcService(this); + _connectionEventProcessor = + new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable } + /** + * send state transition message to application instance + * @return + */ public AtomicBoolean sendTransitionRequestToApplicationInstance() { - + // TODO: add param return null; } - public void updateShardState() { - + /** + * Process the event from Grpc service + * @param event + */ + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + } else { + _connectionEventProcessor.offerEvent(event.getInstanceName(), new participantConnectionProcessor(event)); + } } - public void newParticipantConnecting() { + /** + * Update in memory shard state + */ + class shardStateUpdator implements Runnable { + + GatewayServiceEvent _event; + + public shardStateUpdator(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); + if (helixGatewayService == null) { + // TODO: return error code and throw exception. + return; + } + helixGatewayService.receiveSTResponse(); + } + } + /** + * Create HelixGatewayService instance and register it to the manager. + * It includes waiting for ZK connection, and also wait for previous LiveInstance to expire. + */ + class participantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; + + public participantConnectionProcessor(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService; + _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(), + k -> new HelixGatewayService(GatewayServiceManager.this, _event.getClusterName())); + helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); + if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { + helixGatewayService.registerParticipant(); + } else { + helixGatewayService.deregisterParticipant(_event.getClusterName(), _event.getInstanceName()); + } + } } - public void participantDisconnected() { + @VisibleForTesting + HelixGatewayServiceGrpcService getGrpcService() { + return _grpcService; + } + @VisibleForTesting + HelixGatewayService getHelixGatewayService(String clusterName) { + return _helixGatewayServiceMap.get(clusterName); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java index 49f1bbf2c..c27dcc708 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java @@ -1,9 +1,7 @@ package org.apache.helix.gateway.service; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; @@ -32,10 +30,10 @@ public class HelixGatewayService { return _gatewayServiceManager; } - public void start() { - System.out.println("Starting Helix Gateway Service"); - } - + /** + * Register a participant to the Helix cluster. + * It creates a HelixParticipantManager and connects to the Helix controller. + */ public void registerParticipant() { // TODO: create participant manager and add to _participantsMap HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress); @@ -48,6 +46,11 @@ public class HelixGatewayService { } } + /** + * Deregister a participant from the Helix cluster when app instance is gracefully stopped or connection lost. + * @param clusterName + * @param participantName + */ public void deregisterParticipant(String clusterName, String participantName) { HelixManager manager = _participantsMap.get(clusterName).remove(participantName); if (manager != null) { @@ -69,19 +72,20 @@ public class HelixGatewayService { return flag; } + /** + * Entry point for receive the state transition response from the participant. + * It will update in memory state accordingly. + */ public void receiveSTResponse() { // AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId()); } - public void newParticipantConnecting(){ - - } - - public void participantDisconnected(){ - - } - + /** + * Stop the HelixGatewayService. + * It stops all participants in the cluster. + */ public void stop() { + // TODO: stop all participants System.out.println("Stopping Helix Gateway Service"); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java index d419a71b5..e206c0edc 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java @@ -5,7 +5,6 @@ package org.apache.helix.gateway.service; */ public interface HelixGatewayServiceProcessor { - public boolean sendStateTransitionMessage(String instanceName); + public boolean sendStateTransitionMessage( String instanceName); - public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java deleted file mode 100644 index 3df3b00db..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.helix.gateway.service; - -public class ReplicaStateTracker { - - boolean compareTargetState() { - return true; - } - - boolean updateReplicaState() { - return true; - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java index b7e06051e..8da4692dd 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java @@ -1,7 +1,6 @@ package org.apache.helix.gateway.statemodel; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel; import org.apache.helix.participant.statemachine.StateModelFactory; public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory<HelixGatewayOnlineOfflineStateModel> { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java index 7953996e9..1d2e07b9e 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java @@ -1,10 +1,8 @@ package org.apache.helix.gateway.util; import java.util.HashMap; -import java.util.HashSet; -import java.util.Queue; -import java.util.Set; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index 7f0c592c2..530f06a66 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -1,17 +1,68 @@ package org.apache.helix.gateway.util; -import org.apache.helix.gateway.service.GatewayServiceManager; -import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; public final class StateTransitionMessageTranslateUtil { - public static TransitionMessage translateSTMsgToProto() { + public static TransitionMessage translateSTMsgToTransitionMessage() { return null; } - public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) { - return null; + /** + * Translate from user sent ShardStateMessage message to Helix Gateway Service event. + */ + public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMessage request) { + + GatewayServiceEvent.GateWayServiceEventBuilder builder; + if (request.hasShardState()) { // init connection to gateway service + ShardState shardState = request.getShardState(); + Map<String, String> shardStateMap = new HashMap<>(); + for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : shardState.getResourceStateList()) { + for (HelixGatewayServiceOuterClass.SingleShardState state : resourceState.getShardStatesList()) { + shardStateMap.put(resourceState.getResource() + "_" + state.getShardName(), state.getCurrentState()); + } + } + builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName( + shardState.getClusterName()).setParticipantName(shardState.getInstanceName()); + } else { + ShardTransitionStatus shardTransitionStatus = request.getShardTransitionStatus(); + // this is status update for established connection + List<HelixGatewayServiceOuterClass.SingleShardTransitionStatus> status = + shardTransitionStatus.getShardTransitionStatusList(); + List<GatewayServiceEvent.StateTransitionResult> stResult = new ArrayList<>(); + for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus shardTransition : status) { + GatewayServiceEvent.StateTransitionResult result = + new GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(), + shardTransition.getCurrentState(), shardTransition.getCurrentState()); + stResult.add(result); + } + builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName( + shardTransitionStatus.getClusterName()) + .setParticipantName(shardTransitionStatus.getInstanceName()) + .setStateTransitionStatusMap(stResult); + } + return builder.build(); + } + + /** + * Translate termination event to GatewayServiceEvent. + */ + + public static GatewayServiceEvent translateClientCloseToEvent(String instanceName, String clusterName) { + GatewayServiceEvent.GateWayServiceEventBuilder builder = + new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName( + clusterName).setParticipantName(instanceName); + return builder.build(); } } diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto index dbf488b5a..996d2239c 100644 --- a/helix-gateway/src/main/proto/HelixGatewayService.proto +++ b/helix-gateway/src/main/proto/HelixGatewayService.proto @@ -20,16 +20,9 @@ message TransitionMessage{ repeated SingleTransitionMessage request = 1; } -message SingleShardTransitionStatus { - string transitionID = 1; // ID of transition message - bool isSuccess = 2; // Was transition successfully performed - optional string currentState = 3; // If it failed, what is the current state it should reported as. -} - message SingleResourceState { string resource = 1; // name of the resource - repeated SingleShardState SingleShardState = 2; // State of each shard - + repeated SingleShardState shardStates = 2; // State of each shard } message SingleShardState { @@ -37,12 +30,31 @@ message SingleShardState { string currentState = 2; // Current state of the shard } -// -message ShardStateMessage{ +message SingleShardTransitionStatus { + string transitionID = 1; // ID of transition message + bool isSuccess = 2; // Was transition successfully performed + optional string currentState = 3; // If it failed, what is the current state it should reported as. +} + +message ShardTransitionStatus{ string instanceName = 1; // Name of the application instance string clusterName = 2; // Name of the cluster to connect to - repeated SingleShardTransitionStatus shardStatus = 3; // state transition result for a shard - repeated SingleShardState shardState = 4; // State of each shard, only reported upon init connection + repeated SingleShardTransitionStatus shardTransitionStatus = 3; // state transition result for a shard +} + +// Application report its state to Helix Gateway upon initial connection +message ShardState{ + string instanceName = 1; // Name of the application instance + string clusterName = 2; // Name of the cluster to connect to + repeated SingleResourceState resourceState = 3; // State of each resource +} + +// Application instance sends message upon initial connection or reply to state transition message +message ShardStateMessage{ + oneof instanceUpdate { + ShardState shardState = 1; + ShardTransitionStatus shardTransitionStatus = 2; + } } service HelixGatewayService { diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java new file mode 100644 index 000000000..01b78593c --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -0,0 +1,47 @@ +package org.apache.helix.gateway.service; + +import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.testng.annotations.Test; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + + +public class TestGatewayServiceManager { + + private GatewayServiceManager manager; + + @Test + public void testConnectionAndDisconnectionEvents() { + + manager = mock(GatewayServiceManager.class); + HelixGatewayServiceGrpcService grpcService = new HelixGatewayServiceGrpcService(manager); + // Mock a connection event + HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent = + HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder() + .setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder() + .setInstanceName("instance1") + .setClusterName("cluster1") + .build()) + .build(); + + // Mock a disconnection event + HelixGatewayServiceOuterClass.ShardStateMessage disconnectionEvent = + HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder() + .setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder() + .setInstanceName("instance1") + .setClusterName("cluster1") + .build()) + .build(); + + // Process connection event + grpcService.report(null).onNext(connectionEvent); + + // Process disconnection event + grpcService.report(null).onNext(disconnectionEvent); + HelixGatewayService gatewayService = manager.getHelixGatewayService("cluster1"); + // Verify the events were processed in sequence + verify(manager, times(2)).newGatewayServiceEvent(any()); + } +}
