This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 00db56642fe735874856b3a15263e2c919a884d2
Author: xyuanlu <[email protected]>
AuthorDate: Thu Jul 25 15:00:12 2024 -0700

    Implement GatewayServiceManager (#2844)
    
    Implement GatewayServiceManager
---
 helix-gateway/pom.xml                              |   5 +
 .../gateway/constant/GatewayServiceEventType.java  |   7 ++
 .../HelixGatewayServiceGrpcService.java            | 118 +++++++++++++++++++++
 .../grpcservice/HelixGatewayServiceService.java    |  67 ------------
 .../helix/gateway/service/GatewayServiceEvent.java | 106 ++++++++++++++++++
 .../gateway/service/GatewayServiceManager.java     | 114 +++++++++++++++-----
 .../helix/gateway/service/HelixGatewayService.java |  32 +++---
 .../service/HelixGatewayServiceProcessor.java      |   3 +-
 .../helix/gateway/service/ReplicaStateTracker.java |  12 ---
 ...HelixGatewayOnlineOfflineStateModelFactory.java |   1 -
 .../helix/gateway/util/PerKeyBlockingExecutor.java |   4 +-
 .../util/StateTransitionMessageTranslateUtil.java  |  61 ++++++++++-
 .../src/main/proto/HelixGatewayService.proto       |  36 ++++---
 .../gateway/service/TestGatewayServiceManager.java |  47 ++++++++
 14 files changed, 468 insertions(+), 145 deletions(-)

diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index f788f5bcf..8a7c50ed7 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -129,6 +129,11 @@
       <artifactId>javax.annotation-api</artifactId>
       <version>1.3.2</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <extensions>
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
new file mode 100644
index 000000000..5ae32e40f
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java
@@ -0,0 +1,7 @@
+package org.apache.helix.gateway.constant;
+
+public enum GatewayServiceEventType {
+  CONNECT,    // init connection to gateway service
+  UPDATE,  // update state transition result
+  DISCONNECT // shutdown connection to gateway service.
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
new file mode 100644
index 000000000..c2f9b57e3
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.grpcservice;
+
+import io.grpc.stub.StreamObserver;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+    implements HelixGatewayServiceProcessor {
+
+  // Map to store the observer for each instance
+  private final Map<String, StreamObserver<TransitionMessage>> _observerMap = 
new HashMap<>();
+  // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
+  private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
+
+  private final GatewayServiceManager _manager;
+
+  // A fine grain lock register on instance level
+  private final PerKeyLockRegistry _lockRegistry;
+
+  public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
+    _manager = manager;
+    _lockRegistry = new PerKeyLockRegistry();
+  }
+
+  /**
+   * Grpc service end pint.
+   * Application instances Report the state of the shard or result of 
transition request to the gateway service.
+   * @param responseObserver
+   * @return
+   */
+  @Override
+  public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+
+    return new StreamObserver<ShardStateMessage>() {
+
+      @Override
+      public void onNext(ShardStateMessage request) {
+        if (request.hasShardState()) {
+          ShardState shardState = request.getShardState();
+          updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
+        }
+        
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        onClientClose(responseObserver);
+      }
+
+      @Override
+      public void onCompleted() {
+        onClientClose(responseObserver);
+      }
+    };
+  }
+
+  /**
+   * Send state transition message to the instance.
+   * The instance must already have established a connection to the gateway 
service.
+   * @param instanceName
+   * @return
+   */
+  @Override
+  public boolean sendStateTransitionMessage(String instanceName) {
+    StreamObserver<TransitionMessage> observer;
+    observer = _observerMap.get(instanceName);
+    if (observer != null) {
+      
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
+    }
+    return true;
+  }
+
+  private void updateObserver(String instanceName, String clusterName,
+      StreamObserver<TransitionMessage> streamObserver) {
+    _lockRegistry.withLock(instanceName, () -> {
+      _observerMap.put(instanceName, streamObserver);
+      _reversedObserverMap.put(streamObserver, new 
ImmutablePair<>(instanceName, clusterName));
+    });
+  }
+
+  private void onClientClose(
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+    String instanceName;
+    String clusterName;
+    Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
+    clusterName = instanceInfo.getRight();
+    instanceName = instanceInfo.getLeft();
+
+    if (instanceName == null || clusterName == null) {
+      // TODO: log error;
+      return;
+    }
+    GatewayServiceEvent event =
+        
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, 
instanceName);
+    _manager.newGatewayServiceEvent(event);
+    _lockRegistry.withLock(instanceName, () -> {
+      _reversedObserverMap.remove(responseObserver);
+      _observerMap.remove(instanceName);
+      _lockRegistry.removeLock(instanceName);
+    });
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
deleted file mode 100644
index 237f1f272..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.helix.gateway.grpcservice;
-
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
-import proto.org.apache.helix.gateway.*;
-import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
-
-import java.util.Map;
-
-
-/**
- * Helix Gateway Service GRPC UI implementation.
- */
-public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
-    implements HelixGatewayServiceProcessor {
-
-  Map<String, StreamObserver<TransitionMessage>> _observerMap =
-      new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();
-
-  @Override
-  public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
-      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
-
-    return new StreamObserver<ShardStateMessage>() {
-
-      @Override
-      public void onNext(ShardStateMessage request) {
-        // called when a client sends a message
-        //....
-        String instanceName = request.getInstanceName();
-        if (!_observerMap.containsValue(instanceName)) {
-          // update state map
-          updateObserver(instanceName, responseObserver);
-        }
-        // process the message
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        // called when a client sends an error
-        //....
-      }
-
-      @Override
-      public void onCompleted() {
-        // called when the client completes
-        //....
-      }
-    };
-  }
-
-  @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
-    return false;
-  }
-
-  @Override
-  public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent 
event) {
-
-  }
-
-  public void updateObserver(String instanceName, 
StreamObserver<TransitionMessage> streamObserver) {
-    _observerMap.put(instanceName, streamObserver);
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
new file mode 100644
index 000000000..68c21bda1
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -0,0 +1,106 @@
+package org.apache.helix.gateway.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+
+
+/**
+ * Event representing message reported by clients to Helix Gateway Service.
+ */
+public class GatewayServiceEvent {
+  // event type
+  private GatewayServiceEventType _eventType;
+  // event data
+  private String _clusterName;
+  private String _instanceName;
+  // A map where client reports the state of each shard upon connection
+  private Map<String, Map<String, String>> _shardStateMap;
+  // result for state transition request
+  private List<StateTransitionResult> _stateTransitionResult;
+
+  public static class StateTransitionResult {
+    private String stateTransitionId;
+    private String stateTransitionStatus;
+    private String shardState;
+
+    public StateTransitionResult(String stateTransitionId, String 
stateTransitionStatus, String shardState) {
+      this.stateTransitionId = stateTransitionId;
+      this.stateTransitionStatus = stateTransitionStatus;
+      this.shardState = shardState;
+    }
+
+    public String getStateTransitionId() {
+      return stateTransitionId;
+    }
+    public String getStateTransitionStatus() {
+      return stateTransitionStatus;
+    }
+    public String getShardState() {
+      return shardState;
+    }
+  }
+
+  private GatewayServiceEvent(GatewayServiceEventType eventType, String 
clusterName, String instanceName,
+      Map<String, Map<String, String>> shardStateMap, 
List<StateTransitionResult> stateTransitionStatusMap) {
+    _eventType = eventType;
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _shardStateMap = shardStateMap;
+    _stateTransitionResult = stateTransitionStatusMap;
+  }
+
+  public GatewayServiceEventType getEventType() {
+    return _eventType;
+  }
+  public String getClusterName() {
+    return _clusterName;
+  }
+  public String getInstanceName() {
+    return _instanceName;
+  }
+  public Map<String, Map<String, String>> getShardStateMap() {
+    return _shardStateMap;
+  }
+  public List<StateTransitionResult> getStateTransitionResult() {
+    return _stateTransitionResult;
+  }
+
+
+  public static class GateWayServiceEventBuilder {
+    private GatewayServiceEventType _eventType;
+    private String _clusterName;
+    private String _instanceName;
+    private Map<String, Map<String, String>> _shardStateMap;
+    private List<StateTransitionResult> _stateTransitionResult;
+
+    public GateWayServiceEventBuilder(GatewayServiceEventType eventType) {
+      this._eventType = eventType;
+    }
+
+    public GateWayServiceEventBuilder setClusterName(String clusterName) {
+      this._clusterName = clusterName;
+      return this;
+    }
+
+    public GateWayServiceEventBuilder setParticipantName(String instanceName) {
+      this._instanceName = instanceName;
+      return this;
+    }
+
+    public GateWayServiceEventBuilder setShardStateMap(Map<String, Map<String, 
String>> shardStateMap) {
+      this._shardStateMap = shardStateMap;
+      return this;
+    }
+
+    public GateWayServiceEventBuilder setStateTransitionStatusMap(
+        List<StateTransitionResult> stateTransitionStatusMap) {
+      this._stateTransitionResult = stateTransitionStatusMap;
+      return this;
+    }
+
+    public GatewayServiceEvent build() {
+      return new GatewayServiceEvent(_eventType, _clusterName, _instanceName, 
_shardStateMap, _stateTransitionResult);
+    }
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index ee803b9cc..889c76c10 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -1,10 +1,14 @@
 package org.apache.helix.gateway.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Map;
-
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
 
 /**
@@ -16,50 +20,102 @@ import 
org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
  */
 
 public class GatewayServiceManager {
+  public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
+  private final Map<String, HelixGatewayService> _helixGatewayServiceMap;
 
-  HelixGatewayServiceService _helixGatewayServiceService;
-
-  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
-
-  Map<String, HelixGatewayService> _helixGatewayServiceMap;
+  // a single thread tp for event processing
+  private final ExecutorService _participantStateTransitionResultUpdator;
 
-  // TODO: add thread pool for init
-  // single thread tp for update
+  // link to grpc service
+  private final HelixGatewayServiceGrpcService _grpcService;
 
-  public enum EventType {
-    CONNECT,    // init connection to gateway service
-    UPDATE,  // update state transition result
-    DISCONNECT // shutdown connection to gateway service.
-  }
-
-  public class GateWayServiceEvent {
-    // event type
-    EventType eventType;
-    // event data
-    String clusterName;
-    String participantName;
-
-    // todo: add more fields
-  }
+  // a per key executor for connection event. All event for the same instance 
will be executed in sequence.
+  // It is used to ensure for each instance, the connect/disconnect event 
won't start until the previous one is done.
+  private final PerKeyBlockingExecutor _connectionEventProcessor;
 
   public GatewayServiceManager() {
     _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
+    _grpcService = new HelixGatewayServiceGrpcService(this);
+    _connectionEventProcessor =
+        new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
   }
 
+  /**
+   * send state transition message to application instance
+   * @return
+   */
   public AtomicBoolean sendTransitionRequestToApplicationInstance() {
-
+    // TODO: add param
     return null;
   }
 
-  public void updateShardState() {
-
+  /**
+   * Process the event from Grpc service
+   * @param event
+   */
+  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+    if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+      _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+    } else {
+      _connectionEventProcessor.offerEvent(event.getInstanceName(), new 
participantConnectionProcessor(event));
+    }
   }
 
-  public void newParticipantConnecting() {
+  /**
+   * Update in memory shard state
+   */
+  class shardStateUpdator implements Runnable {
+
+    GatewayServiceEvent _event;
+
+    public shardStateUpdator(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
+      if (helixGatewayService == null) {
+        // TODO: return error code and throw exception.
+        return;
+      }
+      helixGatewayService.receiveSTResponse();
+    }
+  }
 
+  /**
+   * Create HelixGatewayService instance and register it to the manager.
+   * It includes waiting for ZK connection, and also wait for previous 
LiveInstance to expire.
+   */
+  class participantConnectionProcessor implements Runnable {
+    GatewayServiceEvent _event;
+
+    public participantConnectionProcessor(GatewayServiceEvent event) {
+      _event = event;
+    }
+
+    @Override
+    public void run() {
+      HelixGatewayService helixGatewayService;
+      _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
+          k -> new HelixGatewayService(GatewayServiceManager.this, 
_event.getClusterName()));
+      helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
+      if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+        helixGatewayService.registerParticipant();
+      } else {
+        helixGatewayService.deregisterParticipant(_event.getClusterName(), 
_event.getInstanceName());
+      }
+    }
   }
 
-  public void participantDisconnected() {
+  @VisibleForTesting
+  HelixGatewayServiceGrpcService getGrpcService() {
+    return _grpcService;
+  }
 
+  @VisibleForTesting
+  HelixGatewayService getHelixGatewayService(String clusterName) {
+    return _helixGatewayServiceMap.get(clusterName);
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
index 49f1bbf2c..c27dcc708 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
@@ -1,9 +1,7 @@
 package org.apache.helix.gateway.service;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -32,10 +30,10 @@ public class HelixGatewayService {
     return _gatewayServiceManager;
   }
 
-  public void start() {
-    System.out.println("Starting Helix Gateway Service");
-  }
-
+  /**
+   * Register a participant to the Helix cluster.
+   * It creates a HelixParticipantManager and connects to the Helix controller.
+   */
   public void registerParticipant() {
     // TODO: create participant manager and add to _participantsMap
     HelixManager manager = new ZKHelixManager("clusterName", "instanceName", 
InstanceType.PARTICIPANT, _zkAddress);
@@ -48,6 +46,11 @@ public class HelixGatewayService {
     }
   }
 
+  /**
+   * Deregister a participant from the Helix cluster when app instance is 
gracefully stopped or connection lost.
+   * @param clusterName
+   * @param participantName
+   */
   public void deregisterParticipant(String clusterName, String 
participantName) {
     HelixManager manager = 
_participantsMap.get(clusterName).remove(participantName);
     if (manager != null) {
@@ -69,19 +72,20 @@ public class HelixGatewayService {
       return flag;
   }
 
+  /**
+   * Entry point for receive the state transition response from the 
participant.
+   * It will update in memory state accordingly.
+   */
   public void receiveSTResponse() {
      // AtomicBoolean flag = 
_flagMap.get(instanceName).remove(response.getMessageId());
   }
 
-  public void newParticipantConnecting(){
-
-  }
-
-  public void participantDisconnected(){
-
-  }
-
+  /**
+   * Stop the HelixGatewayService.
+   * It stops all participants in the cluster.
+   */
   public void stop() {
+    // TODO: stop all participants
     System.out.println("Stopping Helix Gateway Service");
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
index d419a71b5..e206c0edc 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
@@ -5,7 +5,6 @@ package org.apache.helix.gateway.service;
  */
 public interface HelixGatewayServiceProcessor {
 
-  public boolean sendStateTransitionMessage(String instanceName);
+  public boolean sendStateTransitionMessage( String instanceName);
 
-  public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent 
event);
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
deleted file mode 100644
index 3df3b00db..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.helix.gateway.service;
-
-public class ReplicaStateTracker {
-
-  boolean compareTargetState() {
-    return true;
-  }
-
-  boolean updateReplicaState() {
-    return true;
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
index b7e06051e..8da4692dd 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
@@ -1,7 +1,6 @@
 package org.apache.helix.gateway.statemodel;
 
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
 public class HelixGatewayOnlineOfflineStateModelFactory extends 
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
index 7953996e9..1d2e07b9e 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
@@ -1,10 +1,8 @@
 package org.apache.helix.gateway.util;
 
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 7f0c592c2..530f06a66 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -1,17 +1,68 @@
 package org.apache.helix.gateway.util;
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 public final class StateTransitionMessageTranslateUtil {
 
-  public static TransitionMessage translateSTMsgToProto() {
+  public static TransitionMessage translateSTMsgToTransitionMessage() {
     return null;
   }
 
-  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
-    return null;
+  /**
+   * Translate from user sent ShardStateMessage message to Helix Gateway 
Service event.
+   */
+  public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
+
+    GatewayServiceEvent.GateWayServiceEventBuilder builder;
+    if (request.hasShardState()) { // init connection to gateway service
+      ShardState shardState = request.getShardState();
+      Map<String, String> shardStateMap = new HashMap<>();
+      for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : 
shardState.getResourceStateList()) {
+        for (HelixGatewayServiceOuterClass.SingleShardState state : 
resourceState.getShardStatesList()) {
+          shardStateMap.put(resourceState.getResource() + "_" + 
state.getShardName(), state.getCurrentState());
+        }
+      }
+      builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+          
shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+    } else {
+      ShardTransitionStatus shardTransitionStatus = 
request.getShardTransitionStatus();
+      // this is status update for established connection
+      List<HelixGatewayServiceOuterClass.SingleShardTransitionStatus> status =
+          shardTransitionStatus.getShardTransitionStatusList();
+      List<GatewayServiceEvent.StateTransitionResult> stResult = new 
ArrayList<>();
+      for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus 
shardTransition : status) {
+        GatewayServiceEvent.StateTransitionResult result =
+            new 
GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(),
+                shardTransition.getCurrentState(), 
shardTransition.getCurrentState());
+        stResult.add(result);
+      }
+      builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
+              shardTransitionStatus.getClusterName())
+          .setParticipantName(shardTransitionStatus.getInstanceName())
+          .setStateTransitionStatusMap(stResult);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Translate termination event to GatewayServiceEvent.
+   */
+
+  public static GatewayServiceEvent translateClientCloseToEvent(String 
instanceName, String clusterName) {
+    GatewayServiceEvent.GateWayServiceEventBuilder builder =
+        new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
+            clusterName).setParticipantName(instanceName);
+    return builder.build();
   }
 }
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto 
b/helix-gateway/src/main/proto/HelixGatewayService.proto
index dbf488b5a..996d2239c 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -20,16 +20,9 @@ message TransitionMessage{
   repeated SingleTransitionMessage request = 1;
 }
 
-message SingleShardTransitionStatus {
-  string transitionID = 1;       // ID of transition message
-  bool isSuccess = 2;           // Was transition successfully performed
-  optional string currentState = 3;   // If it failed, what is the current 
state it should reported as.
-}
-
 message SingleResourceState {
   string resource = 1;             // name of the resource
-  repeated SingleShardState SingleShardState = 2;    // State of each shard
-
+  repeated SingleShardState shardStates = 2;    // State of each shard
 }
 
 message SingleShardState {
@@ -37,12 +30,31 @@ message SingleShardState {
   string currentState = 2;     // Current state of the shard
 }
 
-//
-message ShardStateMessage{
+message SingleShardTransitionStatus {
+  string transitionID = 1;       // ID of transition message
+  bool isSuccess = 2;           // Was transition successfully performed
+  optional string currentState = 3;   // If it failed, what is the current 
state it should reported as.
+}
+
+message ShardTransitionStatus{
   string instanceName = 1;         // Name of the application instance
   string clusterName = 2;          // Name of the cluster to connect to
-  repeated SingleShardTransitionStatus shardStatus = 3;    // state transition 
result for a shard
-  repeated SingleShardState shardState = 4;        // State of each shard, 
only reported upon init connection
+  repeated SingleShardTransitionStatus shardTransitionStatus = 3;    // state 
transition result for a shard
+}
+
+// Application report its state to Helix Gateway upon initial connection
+message ShardState{
+  string instanceName = 1;         // Name of the application instance
+  string clusterName = 2;          // Name of the cluster to connect to
+  repeated SingleResourceState resourceState = 3;    // State of each resource
+}
+
+// Application instance sends message upon initial connection or reply to 
state transition message
+message ShardStateMessage{
+  oneof instanceUpdate {
+    ShardState shardState = 1;
+    ShardTransitionStatus shardTransitionStatus = 2;
+  }
 }
 
 service HelixGatewayService {
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
new file mode 100644
index 000000000..01b78593c
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -0,0 +1,47 @@
+package org.apache.helix.gateway.service;
+
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestGatewayServiceManager {
+
+  private GatewayServiceManager manager;
+
+  @Test
+  public void testConnectionAndDisconnectionEvents() {
+
+    manager = mock(GatewayServiceManager.class);
+    HelixGatewayServiceGrpcService grpcService = new 
HelixGatewayServiceGrpcService(manager);
+    // Mock a connection event
+    HelixGatewayServiceOuterClass.ShardStateMessage connectionEvent =
+        HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
+            
.setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder()
+                .setInstanceName("instance1")
+                .setClusterName("cluster1")
+                .build())
+            .build();
+
+    // Mock a disconnection event
+    HelixGatewayServiceOuterClass.ShardStateMessage disconnectionEvent =
+        HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
+            
.setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder()
+                .setInstanceName("instance1")
+                .setClusterName("cluster1")
+                .build())
+            .build();
+
+    // Process connection event
+    grpcService.report(null).onNext(connectionEvent);
+
+    // Process disconnection event
+    grpcService.report(null).onNext(disconnectionEvent);
+    HelixGatewayService gatewayService = 
manager.getHelixGatewayService("cluster1");
+    // Verify the events were processed in sequence
+    verify(manager, times(2)).newGatewayServiceEvent(any());
+  }
+}

Reply via email to