This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 3795062e7 Implement GatewayServiceManager (#2844)
3795062e7 is described below
commit 3795062e7a4a6a795d0f1c8610ad9837560564e9
Author: xyuanlu <[email protected]>
AuthorDate: Thu Jul 25 15:00:12 2024 -0700
Implement GatewayServiceManager (#2844)
Implement GatewayServiceManager
---
helix-gateway/pom.xml | 5 +
.../gateway/constant/GatewayServiceEventType.java | 7 ++
.../HelixGatewayServiceGrpcService.java | 118 +++++++++++++++++++++
.../grpcservice/HelixGatewayServiceService.java | 67 ------------
.../helix/gateway/service/GatewayServiceEvent.java | 106 ++++++++++++++++++
.../gateway/service/GatewayServiceManager.java | 114 +++++++++++++++-----
.../helix/gateway/service/HelixGatewayService.java | 32 +++---
.../service/HelixGatewayServiceProcessor.java | 3 +-
.../helix/gateway/service/ReplicaStateTracker.java | 12 ---
...HelixGatewayOnlineOfflineStateModelFactory.java | 1 -
.../helix/gateway/util/PerKeyBlockingExecutor.java | 4 +-
.../util/StateTransitionMessageTranslateUtil.java | 61 ++++++++++-
.../src/main/proto/HelixGatewayService.proto | 36 ++++---
.../gateway/service/TestGatewayServiceManager.java | 47 ++++++++
14 files changed, 468 insertions(+), 145 deletions(-)
diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index f788f5bcf..8a7c50ed7 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -129,6 +129,11 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<extensions>
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
new file mode 100644
index 000000000..5ae32e40f
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
@@ -0,0 +1,7 @@
+package org.apache.helix.gateway.constant;
+
+public enum GatewayServiceEventType {
+ CONNECT, // init connection to gateway service
+ UPDATE, // update state transition result
+ DISCONNECT // shutdown connection to gateway service.
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
new file mode 100644
index 000000000..c2f9b57e3
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
+import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+ implements HelixGatewayServiceProcessor {
+
+ // Map to store the observer for each instance
+ private final Map<String, StreamObserver<TransitionMessage>> _observerMap =
new HashMap<>();
+ // A reverse map to store the instance name for each observer. It is used to
find the instance when connection is closed.
+ private final Map<StreamObserver<TransitionMessage>, Pair<String, String>>
_reversedObserverMap = new HashMap<>();
+
+ private final GatewayServiceManager _manager;
+
+ // A fine grain lock register on instance level
+ private final PerKeyLockRegistry _lockRegistry;
+
+ public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+ _manager = manager;
+ _lockRegistry = new PerKeyLockRegistry();
+ }
+
+ /**
+ * Grpc service end pint.
+ * Application instances Report the state of the shard or result of
transition request to the gateway service.
+ * @param responseObserver
+ * @return
+ */
+ @Override
+ public
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
report(
+
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
responseObserver) {
+
+ return new StreamObserver<ShardStateMessage>() {
+
+ @Override
+ public void onNext(ShardStateMessage request) {
+ if (request.hasShardState()) {
+ ShardState shardState = request.getShardState();
+ updateObserver(shardState.getInstanceName(),
shardState.getClusterName(), responseObserver);
+ }
+
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ onClientClose(responseObserver);
+ }
+
+ @Override
+ public void onCompleted() {
+ onClientClose(responseObserver);
+ }
+ };
+ }
+
+ /**
+ * Send state transition message to the instance.
+ * The instance must already have established a connection to the gateway
service.
+ * @param instanceName
+ * @return
+ */
+ @Override
+ public boolean sendStateTransitionMessage(String instanceName) {
+ StreamObserver<TransitionMessage> observer;
+ observer = _observerMap.get(instanceName);
+ if (observer != null) {
+
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
+ }
+ return true;
+ }
+
+ private void updateObserver(String instanceName, String clusterName,
+ StreamObserver<TransitionMessage> streamObserver) {
+ _lockRegistry.withLock(instanceName, () -> {
+ _observerMap.put(instanceName, streamObserver);
+ _reversedObserverMap.put(streamObserver, new
ImmutablePair<>(instanceName, clusterName));
+ });
+ }
+
+ private void onClientClose(
+
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
responseObserver) {
+ String instanceName;
+ String clusterName;
+ Pair<String, String> instanceInfo =
_reversedObserverMap.get(responseObserver);
+ clusterName = instanceInfo.getRight();
+ instanceName = instanceInfo.getLeft();
+
+ if (instanceName == null || clusterName == null) {
+ // TODO: log error;
+ return;
+ }
+ GatewayServiceEvent event =
+
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName,
instanceName);
+ _manager.newGatewayServiceEvent(event);
+ _lockRegistry.withLock(instanceName, () -> {
+ _reversedObserverMap.remove(responseObserver);
+ _observerMap.remove(instanceName);
+ _lockRegistry.removeLock(instanceName);
+ });
+ }
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
deleted file mode 100644
index 237f1f272..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.helix.gateway.grpcservice;
-
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
-import proto.org.apache.helix.gateway.*;
-import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
-
-import java.util.Map;
-
-
-/**
- * Helix Gateway Service GRPC UI implementation.
- */
-public class HelixGatewayServiceService extends
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
- implements HelixGatewayServiceProcessor {
-
- Map<String, StreamObserver<TransitionMessage>> _observerMap =
- new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();
-
- @Override
- public
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
report(
-
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
responseObserver) {
-
- return new StreamObserver<ShardStateMessage>() {
-
- @Override
- public void onNext(ShardStateMessage request) {
- // called when a client sends a message
- //....
- String instanceName = request.getInstanceName();
- if (!_observerMap.containsValue(instanceName)) {
- // update state map
- updateObserver(instanceName, responseObserver);
- }
- // process the message
- }
-
- @Override
- public void onError(Throwable t) {
- // called when a client sends an error
- //....
- }
-
- @Override
- public void onCompleted() {
- // called when the client completes
- //....
- }
- };
- }
-
- @Override
- public boolean sendStateTransitionMessage(String instanceName) {
- return false;
- }
-
- @Override
- public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent
event) {
-
- }
-
- public void updateObserver(String instanceName,
StreamObserver<TransitionMessage> streamObserver) {
- _observerMap.put(instanceName, streamObserver);
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
new file mode 100644
index 000000000..68c21bda1
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -0,0 +1,106 @@
+package org.apache.helix.gateway.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+
+
+/**
+ * Event representing message reported by clients to Helix Gateway Service.
+ */
+public class GatewayServiceEvent {
+ // event type
+ private GatewayServiceEventType _eventType;
+ // event data
+ private String _clusterName;
+ private String _instanceName;
+ // A map where client reports the state of each shard upon connection
+ private Map<String, Map<String, String>> _shardStateMap;
+ // result for state transition request
+ private List<StateTransitionResult> _stateTransitionResult;
+
+ public static class StateTransitionResult {
+ private String stateTransitionId;
+ private String stateTransitionStatus;
+ private String shardState;
+
+ public StateTransitionResult(String stateTransitionId, String
stateTransitionStatus, String shardState) {
+ this.stateTransitionId = stateTransitionId;
+ this.stateTransitionStatus = stateTransitionStatus;
+ this.shardState = shardState;
+ }
+
+ public String getStateTransitionId() {
+ return stateTransitionId;
+ }
+ public String getStateTransitionStatus() {
+ return stateTransitionStatus;
+ }
+ public String getShardState() {
+ return shardState;
+ }
+ }
+
+ private GatewayServiceEvent(GatewayServiceEventType eventType, String
clusterName, String instanceName,
+ Map<String, Map<String, String>> shardStateMap,
List<StateTransitionResult> stateTransitionStatusMap) {
+ _eventType = eventType;
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _shardStateMap = shardStateMap;
+ _stateTransitionResult = stateTransitionStatusMap;
+ }
+
+ public GatewayServiceEventType getEventType() {
+ return _eventType;
+ }
+ public String getClusterName() {
+ return _clusterName;
+ }
+ public String getInstanceName() {
+ return _instanceName;
+ }
+ public Map<String, Map<String, String>> getShardStateMap() {
+ return _shardStateMap;
+ }
+ public List<StateTransitionResult> getStateTransitionResult() {
+ return _stateTransitionResult;
+ }
+
+
+ public static class GateWayServiceEventBuilder {
+ private GatewayServiceEventType _eventType;
+ private String _clusterName;
+ private String _instanceName;
+ private Map<String, Map<String, String>> _shardStateMap;
+ private List<StateTransitionResult> _stateTransitionResult;
+
+ public GateWayServiceEventBuilder(GatewayServiceEventType eventType) {
+ this._eventType = eventType;
+ }
+
+ public GateWayServiceEventBuilder setClusterName(String clusterName) {
+ this._clusterName = clusterName;
+ return this;
+ }
+
+ public GateWayServiceEventBuilder setParticipantName(String instanceName) {
+ this._instanceName = instanceName;
+ return this;
+ }
+
+ public GateWayServiceEventBuilder setShardStateMap(Map<String, Map<String,
String>> shardStateMap) {
+ this._shardStateMap = shardStateMap;
+ return this;
+ }
+
+ public GateWayServiceEventBuilder setStateTransitionStatusMap(
+ List<StateTransitionResult> stateTransitionStatusMap) {
+ this._stateTransitionResult = stateTransitionStatusMap;
+ return this;
+ }
+
+ public GatewayServiceEvent build() {
+ return new GatewayServiceEvent(_eventType, _clusterName, _instanceName,
_shardStateMap, _stateTransitionResult);
+ }
+ }
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index ee803b9cc..889c76c10 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -1,10 +1,14 @@
package org.apache.helix.gateway.service;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
-
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
/**
@@ -16,50 +20,102 @@ import
org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
*/
public class GatewayServiceManager {
+ public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
+ private final Map<String, HelixGatewayService> _helixGatewayServiceMap;
- HelixGatewayServiceService _helixGatewayServiceService;
-
- HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
- Map<String, HelixGatewayService> _helixGatewayServiceMap;
+ // a single thread tp for event processing
+ private final ExecutorService _participantStateTransitionResultUpdator;
- // TODO: add thread pool for init
- // single thread tp for update
+ // link to grpc service
+ private final HelixGatewayServiceGrpcService _grpcService;
- public enum EventType {
- CONNECT, // init connection to gateway service
- UPDATE, // update state transition result
- DISCONNECT // shutdown connection to gateway service.
- }
-
- public class GateWayServiceEvent {
- // event type
- EventType eventType;
- // event data
- String clusterName;
- String participantName;
-
- // todo: add more fields
- }
+ // a per key executor for connection event. All event for the same instance
will be executed in sequence.
+ // It is used to ensure for each instance, the connect/disconnect event
won't start until the previous one is done.
+ private final PerKeyBlockingExecutor _connectionEventProcessor;
public GatewayServiceManager() {
_helixGatewayServiceMap = new ConcurrentHashMap<>();
+ _participantStateTransitionResultUpdator =
Executors.newSingleThreadExecutor();
+ _grpcService = new HelixGatewayServiceGrpcService(this);
+ _connectionEventProcessor =
+ new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); //
todo: make it configurable
}
+ /**
+ * send state transition message to application instance
+ * @return
+ */
public AtomicBoolean sendTransitionRequestToApplicationInstance() {
-
+ // TODO: add param
return null;
}
- public void updateShardState() {
-
+ /**
+ * Process the event from Grpc service
+ * @param event
+ */
+ public void newGatewayServiceEvent(GatewayServiceEvent event) {
+ if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+ _participantStateTransitionResultUpdator.submit(new
shardStateUpdator(event));
+ } else {
+ _connectionEventProcessor.offerEvent(event.getInstanceName(), new
participantConnectionProcessor(event));
+ }
}
- public void newParticipantConnecting() {
+ /**
+ * Update in memory shard state
+ */
+ class shardStateUpdator implements Runnable {
+
+ GatewayServiceEvent _event;
+
+ public shardStateUpdator(GatewayServiceEvent event) {
+ _event = event;
+ }
+
+ @Override
+ public void run() {
+ HelixGatewayService helixGatewayService =
_helixGatewayServiceMap.get(_event.getClusterName());
+ if (helixGatewayService == null) {
+ // TODO: return error code and throw exception.
+ return;
+ }
+ helixGatewayService.receiveSTResponse();
+ }
+ }
+ /**
+ * Create HelixGatewayService instance and register it to the manager.
+ * It includes waiting for ZK connection, and also wait for previous
LiveInstance to expire.
+ */
+ class participantConnectionProcessor implements Runnable {
+ GatewayServiceEvent _event;
+
+ public participantConnectionProcessor(GatewayServiceEvent event) {
+ _event = event;
+ }
+
+ @Override
+ public void run() {
+ HelixGatewayService helixGatewayService;
+ _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
+ k -> new HelixGatewayService(GatewayServiceManager.this,
_event.getClusterName()));
+ helixGatewayService =
_helixGatewayServiceMap.get(_event.getClusterName());
+ if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+ helixGatewayService.registerParticipant();
+ } else {
+ helixGatewayService.deregisterParticipant(_event.getClusterName(),
_event.getInstanceName());
+ }
+ }
}
- public void participantDisconnected() {
+ @VisibleForTesting
+ HelixGatewayServiceGrpcService getGrpcService() {
+ return _grpcService;
+ }
+ @VisibleForTesting
+ HelixGatewayService getHelixGatewayService(String clusterName) {
+ return _helixGatewayServiceMap.get(clusterName);
}
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
index 49f1bbf2c..c27dcc708 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
@@ -1,9 +1,7 @@
package org.apache.helix.gateway.service;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -32,10 +30,10 @@ public class HelixGatewayService {
return _gatewayServiceManager;
}
- public void start() {
- System.out.println("Starting Helix Gateway Service");
- }
-
+ /**
+ * Register a participant to the Helix cluster.
+ * It creates a HelixParticipantManager and connects to the Helix controller.
+ */
public void registerParticipant() {
// TODO: create participant manager and add to _participantsMap
HelixManager manager = new ZKHelixManager("clusterName", "instanceName",
InstanceType.PARTICIPANT, _zkAddress);
@@ -48,6 +46,11 @@ public class HelixGatewayService {
}
}
+ /**
+ * Deregister a participant from the Helix cluster when app instance is
gracefully stopped or connection lost.
+ * @param clusterName
+ * @param participantName
+ */
public void deregisterParticipant(String clusterName, String
participantName) {
HelixManager manager =
_participantsMap.get(clusterName).remove(participantName);
if (manager != null) {
@@ -69,19 +72,20 @@ public class HelixGatewayService {
return flag;
}
+ /**
+ * Entry point for receive the state transition response from the
participant.
+ * It will update in memory state accordingly.
+ */
public void receiveSTResponse() {
// AtomicBoolean flag =
_flagMap.get(instanceName).remove(response.getMessageId());
}
- public void newParticipantConnecting(){
-
- }
-
- public void participantDisconnected(){
-
- }
-
+ /**
+ * Stop the HelixGatewayService.
+ * It stops all participants in the cluster.
+ */
public void stop() {
+ // TODO: stop all participants
System.out.println("Stopping Helix Gateway Service");
}
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
index d419a71b5..e206c0edc 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
@@ -5,7 +5,6 @@ package org.apache.helix.gateway.service;
*/
public interface HelixGatewayServiceProcessor {
- public boolean sendStateTransitionMessage(String instanceName);
+ public boolean sendStateTransitionMessage( String instanceName);
- public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent
event);
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
deleted file mode 100644
index 3df3b00db..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.helix.gateway.service;
-
-public class ReplicaStateTracker {
-
- boolean compareTargetState() {
- return true;
- }
-
- boolean updateReplicaState() {
- return true;
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
index b7e06051e..8da4692dd 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
@@ -1,7 +1,6 @@
package org.apache.helix.gateway.statemodel;
import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
public class HelixGatewayOnlineOfflineStateModelFactory extends
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
index 7953996e9..1d2e07b9e 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
@@ -1,10 +1,8 @@
package org.apache.helix.gateway.util;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 7f0c592c2..530f06a66 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -1,17 +1,68 @@
package org.apache.helix.gateway.util;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
public final class StateTransitionMessageTranslateUtil {
- public static TransitionMessage translateSTMsgToProto() {
+ public static TransitionMessage translateSTMsgToTransitionMessage() {
return null;
}
- public static GatewayServiceManager.GateWayServiceEvent
translateProtoToSTMsg(ShardStateMessage message) {
- return null;
+ /**
+ * Translate from user sent ShardStateMessage message to Helix Gateway
Service event.
+ */
+ public static GatewayServiceEvent
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+ GatewayServiceEvent.GateWayServiceEventBuilder builder;
+ if (request.hasShardState()) { // init connection to gateway service
+ ShardState shardState = request.getShardState();
+ Map<String, String> shardStateMap = new HashMap<>();
+ for (HelixGatewayServiceOuterClass.SingleResourceState resourceState :
shardState.getResourceStateList()) {
+ for (HelixGatewayServiceOuterClass.SingleShardState state :
resourceState.getShardStatesList()) {
+ shardStateMap.put(resourceState.getResource() + "_" +
state.getShardName(), state.getCurrentState());
+ }
+ }
+ builder = new
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+
shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+ } else {
+ ShardTransitionStatus shardTransitionStatus =
request.getShardTransitionStatus();
+ // this is status update for established connection
+ List<HelixGatewayServiceOuterClass.SingleShardTransitionStatus> status =
+ shardTransitionStatus.getShardTransitionStatusList();
+ List<GatewayServiceEvent.StateTransitionResult> stResult = new
ArrayList<>();
+ for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus
shardTransition : status) {
+ GatewayServiceEvent.StateTransitionResult result =
+ new
GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(),
+ shardTransition.getCurrentState(),
shardTransition.getCurrentState());
+ stResult.add(result);
+ }
+ builder = new
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
+ shardTransitionStatus.getClusterName())
+ .setParticipantName(shardTransitionStatus.getInstanceName())
+ .setStateTransitionStatusMap(stResult);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Translate termination event to GatewayServiceEvent.
+ */
+
+ public static GatewayServiceEvent translateClientCloseToEvent(String
instanceName, String clusterName) {
+ GatewayServiceEvent.GateWayServiceEventBuilder builder =
+ new
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
+ clusterName).setParticipantName(instanceName);
+ return builder.build();
}
}
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto
b/helix-gateway/src/main/proto/HelixGatewayService.proto
index dbf488b5a..996d2239c 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -20,16 +20,9 @@ message TransitionMessage{
repeated SingleTransitionMessage request = 1;
}
-message SingleShardTransitionStatus {
- string transitionID = 1; // ID of transition message
- bool isSuccess = 2; // Was transition successfully performed
- optional string currentState = 3; // If it failed, what is the current
state it should reported as.
-}
-
message SingleResourceState {
string resource = 1; // name of the resource
- repeated SingleShardState SingleShardState = 2; // State of each shard
-
+ repeated SingleShardState shardStates = 2; // State of each shard
}
message SingleShardState {
@@ -37,12 +30,31 @@ message SingleShardState {
string currentState = 2; // Current state of the shard
}
-//
-message ShardStateMessage{
+message SingleShardTransitionStatus {
+ string transitionID = 1; // ID of transition message
+ bool isSuccess = 2; // Was transition successfully performed
+ optional string currentState = 3; // If it failed, what is the current
state it should reported as.
+}
+
+message ShardTransitionStatus{
string instanceName = 1; // Name of the application instance
string clusterName = 2; // Name of the cluster to connect to
- repeated SingleShardTransitionStatus shardStatus = 3; // state transition
result for a shard
- repeated SingleShardState shardState = 4; // State of each shard,
only reported upon init connection
+ repeated SingleShardTransitionStatus shardTransitionStatus = 3; // state
transition result for a shard
+}
+
+// Application report its state to Helix Gateway upon initial connection
+message ShardState{
+ string instanceName = 1; // Name of the application instance
+ string clusterName = 2; // Name of the cluster to connect to
+ repeated SingleResourceState resourceState = 3; // State of each resource
+}
+
+// Application instance sends message upon initial connection or reply to
state transition message
+message ShardStateMessage{
+ oneof instanceUpdate {
+ ShardState shardState = 1;
+ ShardTransitionStatus shardTransitionStatus = 2;
+ }
}
service HelixGatewayService {
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
new file mode 100644
index 000000000..01b78593c
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -0,0 +1,47 @@
+package org.apache.helix.gateway.service;
+
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestGatewayServiceManager {
+
+ private GatewayServiceManager manager;
+
+ @Test
+ public void testConnectionAndDisconnectionEvents() {
+
+ manager = mock(GatewayServiceManager.class);
+ HelixGatewayServiceGrpcService grpcService = new
HelixGatewayServiceGrpcService(manager);
+ // Mock a connection event
+ HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent =
+ HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
+
.setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder()
+ .setInstanceName("instance1")
+ .setClusterName("cluster1")
+ .build())
+ .build();
+
+ // Mock a disconnection event
+ HelixGatewayServiceOuterClass.ShardStateMessage disconnectionEvent =
+ HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
+
.setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder()
+ .setInstanceName("instance1")
+ .setClusterName("cluster1")
+ .build())
+ .build();
+
+ // Process connection event
+ grpcService.report(null).onNext(connectionEvent);
+
+ // Process disconnection event
+ grpcService.report(null).onNext(disconnectionEvent);
+ HelixGatewayService gatewayService =
manager.getHelixGatewayService("cluster1");
+ // Verify the events were processed in sequence
+ verify(manager, times(2)).newGatewayServiceEvent(any());
+ }
+}