This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 957901c4c0b504c5855776bc46b952bfc72aea55
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Sun Sep 8 20:05:15 2024 -0700

    Gateway  - Implementing poll-mode channel  (#2900)
    
    This pull request introduces significant enhancements to the Helix Gateway 
service, primarily focusing on implementing a poll-mode channel for 
communication. The main changes include:
    
    Addition of a new HelixGatewayServicePollModeChannel class that implements 
the poll-mode communication channel.
    Updates to GatewayServiceChannelConfig to support new configuration options 
for poll-mode.
---
 .github/workflows/Helix-PR-CI.yml                  |   2 +-
 helix-gateway/pom.xml                              |   2 +-
 .../api/constant/GatewayServiceConfigConstant.java |   1 +
 .../api/service/HelixGatewayServiceChannel.java    |   4 +-
 .../channel/GatewayServiceChannelConfig.java       | 139 +++++++++++++---
 .../channel/HelixGatewayServiceChannelFactory.java |   2 +-
 .../channel/HelixGatewayServiceGrpcService.java    |   7 +-
 .../HelixGatewayServicePollModeChannel.java        | 181 ++++++++++++++++++++-
 .../participant/HelixGatewayParticipant.java       |   4 +-
 .../gateway/service/GatewayServiceManager.java     |  28 +++-
 .../gateway/util/GatewayCurrentStateCache.java     |  14 +-
 .../apache/helix/gateway/util/PollChannelUtil.java | 131 +++++++++++++++
 .../util/StateTransitionMessageTranslateUtil.java  |  37 +++++
 .../TestGatewayServiceChannelConfig.java}          |  23 ++-
 .../TestHelixGatewayServicePollModeChannel.java    |  90 ++++++++++
 .../participant/TestHelixGatewayParticipant.java   |   4 +-
 .../TestGatewayCurrentStateCache.java              |   5 +-
 .../TestStateTransitionMessageTranslateUtil.java   |   5 +-
 18 files changed, 624 insertions(+), 55 deletions(-)

diff --git a/.github/workflows/Helix-PR-CI.yml 
b/.github/workflows/Helix-PR-CI.yml
index d86c8d8af..8ed1fb9ad 100644
--- a/.github/workflows/Helix-PR-CI.yml
+++ b/.github/workflows/Helix-PR-CI.yml
@@ -1,7 +1,7 @@
 name: Helix PR CI
 on:
   pull_request:
-    branches: [ master, metaclient, ApplicationClusterManager] # TODO: remove 
side branch
+    branches: [ master, metaclient, ApplicationClusterManager, 
helix-gateway-service] # TODO: remove side branch
     paths-ignore:
       - '.github/**'
       - 'helix-front/**'
diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index 62e927d11..52c21d599 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.helix</groupId>
     <artifactId>helix</artifactId>
-    <version>1.4.1-SNAPSHOT</version>
+    <version>1.4.2-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
index 70e4d73e3..be5fa1a11 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java
@@ -24,4 +24,5 @@ public class GatewayServiceConfigConstant {
   public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
   public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
   public static final int DEFAULT_POLL_INTERVAL_SEC = 60;
+  public static final int DEFAULT_HEALTH_TIMEOUT_SEC = 60;
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
index 48e3e437d..10055834e 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
@@ -73,12 +73,12 @@ public interface HelixGatewayServiceChannel {
    * @param instanceName  instance name
    * @param reason  reason for closing connection
    */
-  public void closeConnectionWithError(String instanceName, String reason);
+  public void closeConnectionWithError(String clusterName, String 
instanceName, String reason);
 
   /**
    * Gateway service close client connection with success. This function is 
called when manager wants to close client
    * connection gracefully, e.g., when gateway service is shutting down.
    * @param instanceName  instance name
    */
-  public void completeConnection(String instanceName);
+  public void completeConnection(String clusterName, String instanceName);
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
index 31c7af05b..acdc405cc 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java
@@ -19,8 +19,11 @@ package org.apache.helix.gateway.channel;
  * under the License.
  */
 
+import java.util.Map;
+import java.util.Properties;
 
 import static 
org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*;
+import static 
org.apache.helix.gateway.channel.GatewayServiceChannelConfig.FileBasedConfigType.*;
 
 
 public class GatewayServiceChannelConfig {
@@ -46,11 +49,11 @@ public class GatewayServiceChannelConfig {
   // service configs
 
   // service mode for inbound information.
-  private ChannelMode _channelMode;
+  private final ChannelMode _channelMode;
   // channel type for participant liveness detection
-  private ChannelType _participantConnectionChannelType;
+  private final ChannelType _participantConnectionChannelType;
   // channel for sending and receiving shard state transition request and 
shard state response
-  private ChannelType _shardStateChannelType;
+  private final ChannelType _shardStateChannelType;
 
   // grpc server configs
   private final int _grpcServerPort;
@@ -61,15 +64,22 @@ public class GatewayServiceChannelConfig {
 
   // poll mode config
   private final int _pollIntervalSec;
-  // TODO: configs for pull mode grpc client
-
-  // TODO: configs for pull mode with file
+  private final int _pollStartDelaySec;
+  private final int _pollHealthCheckTimeoutSec;
+  private final int _targetFileUpdateIntervalSec;
+  private final Map<String, Map<String, String>> 
_participantLivenessEndpointMap;
+  private final Properties _pollModeConfigs;
+
+  public enum FileBasedConfigType {
+    PARTICIPANT_CURRENT_STATE_PATH,
+    SHARD_TARGET_STATE_PATH
+  }
 
   // getters
-
   public ChannelMode getChannelMode() {
     return _channelMode;
   }
+
   public ChannelType getParticipantConnectionChannelType() {
     return _participantConnectionChannelType;
   }
@@ -102,9 +112,31 @@ public class GatewayServiceChannelConfig {
     return _pollIntervalSec;
   }
 
-  private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode 
channelMode,  ChannelType participantConnectionChannelType,
-      ChannelType shardStateChannelType, int serverHeartBeatInterval, int 
maxAllowedClientHeartBeatInterval,
-      int clientTimeout, boolean enableReflectionService, int pollIntervalSec) 
{
+  public Map<String, Map<String, String>> getParticipantLivenessEndpointMap() {
+    return _participantLivenessEndpointMap;
+  }
+
+  public int getPollStartDelaySec() {
+    return _pollStartDelaySec;
+  }
+
+  public int getPollHealthCheckTimeoutSec() {
+    return _pollHealthCheckTimeoutSec;
+  }
+
+  public int getTargetFileUpdateIntervalSec() {
+    return _targetFileUpdateIntervalSec;
+  }
+
+  public String getPollModeConfig(FileBasedConfigType type) {
+    return _pollModeConfigs.getProperty(type.toString());
+  }
+
+  private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode 
channelMode,
+      ChannelType participantConnectionChannelType, ChannelType 
shardStateChannelType, int serverHeartBeatInterval,
+      int maxAllowedClientHeartBeatInterval, int clientTimeout, boolean 
enableReflectionService, int pollIntervalSec,
+      int pollStartDelaySec, int pollHealthCheckTimeoutSec, int 
targetFileUpdateIntervalSec,
+      Properties pollModeConfigs, Map<String, Map<String, String>> 
participantLivenessEndpointMap) {
     _grpcServerPort = grpcServerPort;
     _channelMode = channelMode;
     _participantConnectionChannelType = participantConnectionChannelType;
@@ -114,6 +146,11 @@ public class GatewayServiceChannelConfig {
     _clientTimeout = clientTimeout;
     _enableReflectionService = enableReflectionService;
     _pollIntervalSec = pollIntervalSec;
+    _pollStartDelaySec = pollStartDelaySec;
+    _pollHealthCheckTimeoutSec = pollHealthCheckTimeoutSec;
+    _targetFileUpdateIntervalSec = targetFileUpdateIntervalSec;
+    _pollModeConfigs = pollModeConfigs;
+    _participantLivenessEndpointMap = participantLivenessEndpointMap;
   }
 
   public static class GatewayServiceProcessorConfigBuilder {
@@ -132,10 +169,12 @@ public class GatewayServiceChannelConfig {
 
     // poll mode config
     private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
-    // poll mode grpc client configs
-
-    // poll mode file configs
-
+    // poll mode config
+    private Properties _pollModeConfigs;
+    private int _pollStartDelaySec = DEFAULT_POLL_INTERVAL_SEC;
+    private int _pollHealthCheckTimeoutSec = DEFAULT_HEALTH_TIMEOUT_SEC;
+    private int _targetFileUpdateIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
+    private Map<String, Map<String, String>> _healthCheckEndpointMap;
 
     public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode 
channelMode) {
       _channelMode = channelMode;
@@ -183,16 +222,69 @@ public class GatewayServiceChannelConfig {
       return this;
     }
 
+    public GatewayServiceProcessorConfigBuilder 
addPollModeConfig(FileBasedConfigType type, String value) {
+      if (_pollModeConfigs == null) {
+        _pollModeConfigs = new Properties();
+      }
+      _pollModeConfigs.put(type.toString(), value);
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setPollStartDelaySec(int 
pollStartDelaySec) {
+      _pollStartDelaySec = pollStartDelaySec;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder setPollHealthCheckTimeout(int 
pollHealthCheckTimeout) {
+      _pollHealthCheckTimeoutSec = pollHealthCheckTimeout;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setTargetFileUpdateIntervalSec(int targetFileUpdateIntervalSec) {
+      _targetFileUpdateIntervalSec = targetFileUpdateIntervalSec;
+      return this;
+    }
+
+    public GatewayServiceProcessorConfigBuilder 
setHealthCheckEndpointMap(Map<String, Map<String, String>> 
healthCheckEndpointMap) {
+      _healthCheckEndpointMap = healthCheckEndpointMap;
+      return this;
+    }
+
     public void validate() {
-      if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
-          && _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
-          _participantConnectionChannelType != ChannelType.GRPC_SERVER
-              && _shardStatenChannelType == ChannelType.GRPC_SERVER)) {
-        throw new IllegalArgumentException(
-            "In caas of GRPC server, Participant connection channel type and 
shard state channel type must be the same");
+      switch (_participantConnectionChannelType) {
+        case GRPC_SERVER:
+          if (_grpcServerPort == 0) {
+            throw new IllegalArgumentException("Grpc server port must be set 
for grpc server channel type");
+          }
+          if (_shardStatenChannelType != ChannelType.GRPC_SERVER) {
+            throw new IllegalArgumentException(
+                "In case of GRPC server, Participant connection channel type 
and shard state channel type must be the same");
+          }
+          break;
+        case FILE:
+          if (_healthCheckEndpointMap == null || 
_healthCheckEndpointMap.isEmpty()) {
+            throw new IllegalArgumentException("Health check endpoint map must 
be set for file channel type");
+          }
+          break;
+        default:
+          break;
       }
-      if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && 
_grpcServerPort == 0) {
-        throw new IllegalArgumentException("Grpc server port must be set for 
grpc server channel type");
+
+      switch (_shardStatenChannelType) {
+        case GRPC_SERVER:
+          if (_participantConnectionChannelType != ChannelType.GRPC_SERVER) {
+            throw new IllegalArgumentException(
+                "In case of GRPC server, Participant connection channel type 
and shard state channel type must be the same");
+          }
+          break;
+        case FILE:
+          if (_pollModeConfigs == null || 
_pollModeConfigs.getProperty(SHARD_TARGET_STATE_PATH.name()) == null
+              || 
_pollModeConfigs.getProperty(SHARD_TARGET_STATE_PATH.name()).isEmpty()) {
+            throw new IllegalArgumentException("Current state and target state 
path must be set for file channel type");
+          }
+          break;
+        default:
+          break;
       }
     }
 
@@ -200,7 +292,8 @@ public class GatewayServiceChannelConfig {
       validate();
       return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, 
_participantConnectionChannelType,
           _shardStatenChannelType, _serverHeartBeatInterval, 
_maxAllowedClientHeartBeatInterval, _clientTimeout,
-          _enableReflectionService, _pollIntervalSec);
+          _enableReflectionService, _pollIntervalSec, _pollStartDelaySec, 
_pollHealthCheckTimeoutSec,
+          _targetFileUpdateIntervalSec, _pollModeConfigs, 
_healthCheckEndpointMap);
     }
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
index fa665a5c8..796e84488 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
@@ -33,7 +33,7 @@ public class HelixGatewayServiceChannelFactory {
         return new HelixGatewayServiceGrpcService(manager, config);
       }
     } else {
-      return new HelixGatewayServicePollModeChannel(config);
+      return new HelixGatewayServicePollModeChannel(manager, config);
     }
     throw new IllegalArgumentException(
         "Unsupported channel mode and type combination: " + 
config.getChannelMode() + " , "
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index 8e9b0882b..9ed06ca23 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -49,7 +49,6 @@ import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMe
  */
 public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
     implements HelixGatewayServiceChannel {
-  // create LOGGER
   private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);
 
   // Map to store the observer for each instance
@@ -118,7 +117,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    * The instance must already have established a connection to the gateway 
service.
    *
    * @param instanceName the instance name to send the message to
-   * @param message the message to convert to the transition message
+   * @param requests the state transition request to send
    */
   @Override
   public void sendStateChangeRequests(String instanceName, ShardChangeRequests 
requests) {
@@ -137,7 +136,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    * @param errorReason   error reason for close
    */
   @Override
-  public void closeConnectionWithError(String instanceName, String 
errorReason) {
+  public void closeConnectionWithError(String clusterName, String 
instanceName, String errorReason) {
     logger.info("Close connection for instance: {} with error reason: {}", 
instanceName, errorReason);
     closeConnectionHelper(instanceName, errorReason, true);
   }
@@ -147,7 +146,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    * @param instanceName instance name
    */
   @Override
-  public void completeConnection(String instanceName) {
+  public void completeConnection(String clusterName, String instanceName) {
     logger.info("Complete connection for instance: {}", instanceName);
     closeConnectionHelper(instanceName, null, false);
   }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 7623b8d2a..3acd9e83b 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -20,38 +20,207 @@ package org.apache.helix.gateway.channel;
  */
 
 import java.io.IOException;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-// TODO: implement this class
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.helix.gateway.channel.GatewayServiceChannelConfig.FileBasedConfigType.*;
+import static org.apache.helix.gateway.util.PollChannelUtil.*;
+
+
+/**
+ * Helix Gateway Service Poll mode implementation.
+ * It periodically polls the current state of the participants and the 
liveness of the participants.
+ */
 public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceChannel {
+  private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayServicePollModeChannel.class);
+  final GatewayServiceManager _manager;
+  final GatewayServiceChannelConfig _config;
+
+  // cluster -> file for user to report shards' current states
+  final String _userCurrentStateFilePath;
+  // cluster -> file path to store the shards' target states
+  final String _targetStateFilePath;
+  final GatewayServiceChannelConfig.ChannelType 
_participantConnectionStatusChannelType;
+  final GatewayServiceChannelConfig.ChannelType _shardStateChannelType;
+
+  // cluster -> host -> liveness result
+  final Map<String, Map<String, Boolean>> _livenessResults;
+  // cluster -> host -> endpoint for query liveness
+  // It is the file pass if _participantConnectionStatusChannelType is FILE, 
grpc endpoint if it is GRPC_CLIENT
+  final Map<String, Map<String, String>> _livenessCheckEndpointMap;
+
+  ScheduledExecutorService _scheduler;
+
+  public HelixGatewayServicePollModeChannel(GatewayServiceManager manager, 
GatewayServiceChannelConfig config) {
+    _manager = manager;
+    _config = config;
+    _scheduler = Executors.newSingleThreadScheduledExecutor();
+    _participantConnectionStatusChannelType = 
_config.getParticipantConnectionChannelType();
+    _shardStateChannelType = _config.getShardStateChannelType();
+    _livenessCheckEndpointMap = _config.getParticipantLivenessEndpointMap();
+    _userCurrentStateFilePath = 
_config.getPollModeConfig(PARTICIPANT_CURRENT_STATE_PATH);
+    _targetStateFilePath = _config.getPollModeConfig(SHARD_TARGET_STATE_PATH);
+    _livenessResults = new HashMap<>();
+  }
 
-  public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig 
config) {
+  /**
+   * Fetch the updates from the participants.
+   * 1. Get the diff of previous and current shard states, and send the state 
change event to the gateway manager.
+   * 2. Compare previous liveness and current liveness, and send the 
connection event to the gateway manager.
+   */
+ protected  void fetchUpdates() {
+    // 1.  get the shard state change
+    Map<String, Map<String, Map<String, Map<String, String>>>> 
currentShardStates =
+        getChangedParticipantsCurrentState(_userCurrentStateFilePath);
+
+    Map<String, Map<String, Map<String, Map<String, String>>>> 
currentStateDiff = new HashMap<>();
+    for (String clusterName : currentShardStates.keySet()) {
+      Map<String, Map<String, Map<String, String>>> clusterDiffMap =
+          _manager.updateCacheWithNewCurrentStateAndGetDiff(clusterName, 
currentShardStates.get(clusterName));
+      if (clusterDiffMap == null || clusterDiffMap.isEmpty()) {
+        continue;
+      }
+      for (String instanceName : clusterDiffMap.keySet()) {
+        // if the instance is previously connected, send state change event
+        if (_livenessResults.get(clusterName) != null && 
_livenessResults.get(clusterName).get(instanceName)) {
+          logger.info("Host {} has state change, sending event to gateway 
manager", instanceName);
+          pushClientEventToGatewayManager(_manager,
+              
StateTransitionMessageTranslateUtil.translateCurrentStateChangeToEvent(clusterName,
 instanceName,
+                  clusterDiffMap.get(instanceName)));
+        }
+      }
+      currentStateDiff.put(clusterName, clusterDiffMap);
+    }
+
+    // 2. fetch host health
+    for (String clusterName : _livenessCheckEndpointMap.keySet()) {
+      for (String instanceName : 
_livenessCheckEndpointMap.get(clusterName).keySet()) {
+        boolean prevLiveness =
+            _livenessResults.get(clusterName) != null && 
_livenessResults.get(clusterName).get(instanceName);
+        boolean liveness = fetchInstanceLivenessStatus(clusterName, 
instanceName);
+
+        if (prevLiveness && !liveness) {  // previously connected, now 
disconnected
+          logger.warn("Host {} is not healthy, sending event to gateway 
manager", instanceName);
+          pushClientEventToGatewayManager(_manager,
+              
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, 
instanceName));
+        } else if (!prevLiveness && liveness) {  // new connection.
+          logger.info("Host {} is newly connected, sending init connection 
event to gateway manager", instanceName);
+          pushClientEventToGatewayManager(_manager,
+              
StateTransitionMessageTranslateUtil.translateCurrentStateDiffToInitConnectEvent(clusterName,
 instanceName,
+                  currentStateDiff.containsKey(clusterName) ? 
currentStateDiff.get(clusterName).get(instanceName)
+                      : new HashMap<>()));
+        }
+        _livenessResults.computeIfAbsent(clusterName, k -> new 
HashMap<>()).put(instanceName, liveness);
+      }
+    }
   }
 
   @Override
   public void sendStateChangeRequests(String instanceName,
       HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) {
+    switch (_shardStateChannelType) {
+      case FILE:
+        // we are periodically writing to the file, so no need to write here.
+        break;
+      default:
+        throw new NotImplementedException("Only support file based channel for 
now");
+    }
   }
 
   @Override
   public void start() throws IOException {
+    logger.info("Starting Helix Gateway Service Poll Mode Channel...");
+    final Runnable fetchUpdatesTask = new Runnable() {
+      @Override
+      public void run() {
+        fetchUpdates();
+      }
+    };
+    _scheduler.scheduleAtFixedRate(fetchUpdatesTask, 
_config.getPollStartDelaySec(),  // init delay
+        _config.getPollIntervalSec(),            //  poll interval
+        TimeUnit.SECONDS);
+    scheduleTargetStateUpdateTask();
+  }
 
+  void scheduleTargetStateUpdateTask() {
+    if (_shardStateChannelType == 
GatewayServiceChannelConfig.ChannelType.FILE) {
+      final Runnable writeTargetStateTask = new Runnable() {
+        @Override
+        public void run() {
+          flushAssignmentToFile(_manager.serializeTargetState(), 
_targetStateFilePath);
+        }
+      };
+      _scheduler.scheduleAtFixedRate(writeTargetStateTask, 
_config.getPollStartDelaySec(),  // init delay
+          _config.getTargetFileUpdateIntervalSec(),            //  poll 
interval
+          TimeUnit.SECONDS);
+    }
   }
 
   @Override
   public void stop() {
-
+    logger.info("Stopping Helix Gateway Service Poll Mode Channel...");
+    // Shutdown the scheduler gracefully when done (e.g., on app termination)
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      _scheduler.shutdown();
+      try {
+        if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
+          _scheduler.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        _scheduler.shutdownNow();
+      }
+    }));
   }
 
   @Override
-  public void closeConnectionWithError(String instanceName, String reason) {
-
+  public void closeConnectionWithError(String clusterName, String 
instanceName, String reason) {
+   // nothing needed for filed based poll mode
   }
 
   @Override
-  public void completeConnection(String instanceName) {
+  public void completeConnection(String clusterName, String instanceName) {
+    // nothing needed for filed based poll mode
+  }
+
+  /**
+   * Get current state of the participants.
+   * Now we only support file based, we will add GRPC based in the future.
+   */
+  protected Map<String, Map<String, Map<String, Map<String, String>>>> 
getChangedParticipantsCurrentState(
+      String userCurrentStateFilePath) {
+    Map<String, Map<String, Map<String, Map<String, String>>>> 
currentShardStates;
+    switch (_shardStateChannelType) {
+      case FILE:
+        currentShardStates = 
readCurrentStateFromFile(userCurrentStateFilePath);
+        return currentShardStates;
+      default:
+        throw new NotImplementedException("Only support file based channel 
shard state for now");
+    }
+  }
 
+  /**
+   * Fetch the liveness status of the instance.
+   * Now we only support file based, we will add GRPC based in the future.
+   */
+  protected boolean fetchInstanceLivenessStatus(String clusterName, String 
instanceName) {
+    String endpoint = 
_livenessCheckEndpointMap.get(clusterName).get(instanceName);
+    switch (_participantConnectionStatusChannelType) {
+      case FILE:
+        return readInstanceLivenessStatusFromFile(endpoint, 
_config.getPollHealthCheckTimeoutSec());
+      default:
+        throw new NotImplementedException("Only support grpc based channel for 
now");
+    }
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index 15e080bb8..b17d897e1 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -184,7 +184,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
   @Override
   public void onDisconnected(HelixManager helixManager, Throwable error) 
throws Exception {
     _onDisconnectedCallback.run();
-    
_gatewayServiceChannel.closeConnectionWithError(_helixManager.getInstanceName(),
+    
_gatewayServiceChannel.closeConnectionWithError(_helixManager.getClusterName(), 
_helixManager.getInstanceName(),
         error.getMessage());
   }
 
@@ -192,7 +192,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
     if (_helixManager.isConnected()) {
       _helixManager.disconnect();
     }
-    _gatewayServiceChannel.completeConnection(_helixManager.getInstanceName());
+    _gatewayServiceChannel.completeConnection(_helixManager.getClusterName(), 
_helixManager.getInstanceName());
   }
 
   public static class Builder {
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 3edd4ee9d..221c2aeac 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,6 +19,8 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
@@ -27,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
@@ -45,6 +46,7 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
  *  4. For ST reply message, update the tracker
  */
 public class GatewayServiceManager {
+  private static final ObjectMapper objectMapper = new ObjectMapper();
   public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
   public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES =
       ImmutableSet.of("OnlineOffline");
@@ -89,6 +91,29 @@ public class GatewayServiceManager {
     }
   }
 
+  private GatewayCurrentStateCache getCache(String clusterName) {
+    return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new 
GatewayCurrentStateCache(clusterName));
+  }
+
+  public void resetTargetStateCache(String clusterName, String instanceName) {
+    getCache(clusterName).resetTargetStateCache(instanceName);
+  }
+
+  public  Map<String, Map<String, Map<String, String>>> 
updateCacheWithNewCurrentStateAndGetDiff(String clusterName,
+      Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
+   return  
getCache(clusterName).updateCacheWithNewCurrentStateAndGetDiff(newCurrentStateMap);
+  }
+
+  public String serializeTargetState() {
+    ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
+    for (String clusterName : _currentStateCacheMap.keySet()) {
+      // add the json node to the target state node
+      targetStateNode.set(clusterName, 
getCache(clusterName).serializeTargetAssignmentsToJSONNode());
+    }
+    targetStateNode.set("timestamp", 
objectMapper.valueToTree(System.currentTimeMillis()));
+    return targetStateNode.toString();
+  }
+
   /**
    * Update in memory shard state
    */
@@ -150,6 +175,7 @@ public class GatewayServiceManager {
 
   private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
       Map<String, Map<String, String>> initialShardStateMap) {
+    resetTargetStateCache(clusterName, instanceName);
     // Create and add the participant to the participant map
     HelixGatewayParticipant.Builder participantBuilder =
         new HelixGatewayParticipant.Builder(_gatewayServiceChannel, 
instanceName, clusterName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
index 503909d4a..a3ba7fbe7 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -64,8 +64,12 @@ public class GatewayCurrentStateCache {
     Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
     for (String instance : newCurrentStateMap.keySet()) {
       Map<String, Map<String, String>> newCurrentState = 
newCurrentStateMap.get(instance);
-      diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new 
ShardStateMap(new HashMap<>()))
-          .updateAndGetDiff(newCurrentState));
+      Map<String, Map<String, String>> resourceStateDiff =
+          _currentStateMap.computeIfAbsent(instance, k -> new 
ShardStateMap(new HashMap<>()))
+              .updateAndGetDiff(newCurrentState);
+      if (resourceStateDiff != null && !resourceStateDiff.isEmpty()) {
+        diff.put(instance, resourceStateDiff);
+      }
     }
     return diff;
   }
@@ -95,7 +99,7 @@ public class GatewayCurrentStateCache {
    * Serialize the target state assignments to a JSON Node.
    * example : 
{"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
    */
-  public ObjectNode serializeTargetAssignmentsToJSON() {
+  public ObjectNode serializeTargetAssignmentsToJSONNode() {
     ObjectNode root = mapper.createObjectNode();
     for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
       root.set(entry.getKey(), entry.getValue().toJSONNode());
@@ -111,6 +115,10 @@ public class GatewayCurrentStateCache {
     stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new 
HashMap<>())).updateWithDiff(diffMap);
   }
 
+  public void resetTargetStateCache(String instance) {
+    _targetStateMap.put(instance, new ShardStateMap(new HashMap<>()));
+  }
+
   public static class ShardStateMap {
     Map<String, Map<String, String>> _stateMap;
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
new file mode 100644
index 000000000..69c9c219d
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -0,0 +1,131 @@
+package org.apache.helix.gateway.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PollChannelUtil {
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private static final Logger logger = 
LoggerFactory.getLogger(PollChannelUtil.class);
+
+  // return  <grpc health stub, ManagedChannel> pair
+  // ManagedChannel need to be shutdown when the connection is no longer needed
+  public static Pair<HealthGrpc.HealthBlockingStub, ManagedChannel> 
createGrpcChannel(String endpointPortString) {
+    String[] endpointPort = endpointPortString.split(":");
+    ManagedChannel channel =
+        ManagedChannelBuilder.forAddress(endpointPort[0], 
Integer.parseInt(endpointPort[1])).usePlaintext().build();
+
+    return new ImmutablePair<>(HealthGrpc.newBlockingStub(channel), channel);
+  }
+
+  /**
+   * Send Unary RPC to the gRPC service to check the health of the container. 
Could be liveness or readiness depends on input.
+   * @param service one of "readiness" or "liveness"
+   *                https://github.com/kubernetes/kubernetes/issues/115651
+   * @return
+   */
+  public static boolean fetchLivenessStatusFromGrpcService(String service, 
HealthGrpc.HealthBlockingStub healthStub) {
+    HealthCheckRequest request = 
HealthCheckRequest.newBuilder().setService(service).build();
+    HealthCheckResponse response = healthStub.check(request);
+    return response.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
+  }
+
+  /**
+   * Flush the current assignment map to a file. The whole file is re-written 
every time.
+   */
+  public static void flushAssignmentToFile(String targetAssignment, String 
filePath) {
+    try (FileWriter fileWriter = new FileWriter(filePath)) {
+      fileWriter.write(targetAssignment);
+    } catch (IOException e) {
+      logger.warn("Failed to write to file: " + filePath, e);
+    }
+  }
+
+  /**
+   * read current state from a file, compare with in memory current state, 
update the in memory current state and return diff.
+   * Current state file format: {"cluster1" : { "instance_1" : { "resource1" : 
{"shard1” : “online" }}}}
+   */
+  public static Map<String, Map<String, Map<String, Map<String, String>>>> 
readCurrentStateFromFile(String filePath) {
+    try {
+      // read from file path
+      File file = new File(filePath);
+      return objectMapper.readValue(file,
+          new TypeReference<Map<String, Map<String, Map<String, Map<String, 
String>>>>>() {
+          });
+    } catch (IOException e) {
+      logger.warn("Failed to read from file: " + filePath);
+      return new HashMap<>();
+    }
+  }
+
+  /**
+   * Read instance liveness status from a file, return true if the instance is 
healthy and the last update time is within timeout.
+   * File format: {"IsAlive": true, "LastUpdateTime": 1629300000}
+   * @param filePath
+   * @param timeoutInSec
+   * @return
+   */
+  public static boolean readInstanceLivenessStatusFromFile(String filePath, 
int timeoutInSec) {
+    try {
+      // read from file path
+      File file = new File(filePath);
+      HostLivenessState status = objectMapper.readValue(file, new 
TypeReference<HostLivenessState>() {
+      });
+      return status.isHealthy() && (System.currentTimeMillis()/1000 - 
status.getLastUpdatedTime()) < timeoutInSec;
+    } catch (IOException e) {
+      logger.warn("Failed to read from file: " + filePath);
+      return false;
+    }
+  }
+
+  /**
+   * Instance health status representation as JSON
+   */
+  public static class HostLivenessState {
+    @JsonProperty ("IsAlive")
+    Boolean _isAlive;
+    @JsonProperty ("LastUpdateTime")
+    long _lastUpdatedTime; // in epoch second
+
+    public Boolean isHealthy(){
+      return _isAlive;
+    }
+    public long getLastUpdatedTime(){
+      return _lastUpdatedTime;
+    }
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index c869c9c55..f9f2b4c3d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -126,4 +126,41 @@ public final class StateTransitionMessageTranslateUtil {
             clusterName).setParticipantName(instanceName);
     return builder.build();
   }
+
+  /**
+   * Translate from current state change to Helix Gateway Service event.
+   * @param instanceName
+   * @param clusterName
+   * @param shardStateMap
+   * @return
+   */
+  public static GatewayServiceEvent translateCurrentStateChangeToEvent(String 
instanceName, String clusterName,
+      Map<String, Map<String, String>> shardStateMap) {
+    List<GatewayServiceEvent.StateTransitionResult> stResult = new 
ArrayList<>();
+    shardStateMap.forEach((resourceName, value) -> value.forEach((key, value1) 
-> {
+      GatewayServiceEvent.StateTransitionResult result =
+          new GatewayServiceEvent.StateTransitionResult(resourceName, key, 
value1);
+      stResult.add(result);
+    }));
+    GatewayServiceEvent.GateWayServiceEventBuilder builder =
+        new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
+            
clusterName).setParticipantName(instanceName).setStateTransitionStatusMap(stResult);
+    return builder.build();
+  }
+
+  /**
+   * Create a GatewayServiceEvent to notify the GatewayServiceManager to 
create a new HelixGatewayParticipant.
+   * @param instanceName the instance name of the newly connected participant
+   * @param clusterName  the cluster nam
+   * @param shardStateMap the initial state of shards on the participant. 
Could be empty map
+   * @return
+   */
+  public static GatewayServiceEvent 
translateCurrentStateDiffToInitConnectEvent(String instanceName, String 
clusterName,
+      Map<String, Map<String, String>> shardStateMap) {
+    GatewayServiceEvent.GateWayServiceEventBuilder builder =
+        new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
+            
clusterName).setParticipantName(instanceName).setShardStateMap(shardStateMap);
+    return builder.build();
+  }
+
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java
similarity index 81%
rename from 
helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
rename to 
helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java
index 30f6c9692..b0d2f0dc6 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.chanel;
+package org.apache.helix.gateway.channel;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,12 +19,11 @@ package org.apache.helix.gateway.chanel;
  * under the License.
  */
 
-import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
-public class GatewayServiceChannelConfigTest {
+public class TestGatewayServiceChannelConfig {
 
   @Test
   public void testGatewayServiceChannelConfigBuilder() {
@@ -56,7 +55,7 @@ public class GatewayServiceChannelConfigTest {
   }
 
   @Test
-  public void testInvalidConfig() {
+  public void testInvalidConfigForGrpc() {
     GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
         new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
 
@@ -70,4 +69,20 @@ public class GatewayServiceChannelConfigTest {
       // expected
     }
   }
+
+  @Test
+  public void testInvalidConfigForFile() {
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder();
+
+    
builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE);
+
+    // assert er get an exception
+    try {
+      builder.build();
+      Assert.fail("Should have thrown an exception");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java
new file mode 100644
index 000000000..98941e94d
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java
@@ -0,0 +1,90 @@
+package org.apache.helix.gateway.channel;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import static 
org.apache.helix.gateway.channel.GatewayServiceChannelConfig.ChannelMode.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestHelixGatewayServicePollModeChannel {
+  private HelixGatewayServicePollModeChannel pollModeChannel;
+  private GatewayServiceManager manager;
+  private ScheduledExecutorService scheduler;
+
+  int connectEventCount = 0;
+  int disconnectEventCount = 0;
+  int updateEventCount = 0;
+
+  @Test
+  public void testFetchUpdates() {
+    scheduler = mock(ScheduledExecutorService.class);
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new 
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setChannelMode(POLL_MODE)
+            .setHealthCheckEndpointMap(Map.of("cluster1", Map.of("instance1", 
"endpoint1")))
+            
.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE)
+            
.setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.FILE)
+            
.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH,
+                "CurrentStatePath")
+            
.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH,
+                "shardTargetStatePath")
+            .setPollIntervalSec(60 * 1000) // set a larger number to avoid 
recurrent polling
+            .setPollStartDelaySec(60 * 1000)
+            .setTargetFileUpdateIntervalSec(60 * 1000);
+
+    manager = new DummyGatewayServiceManager(builder.build());
+    pollModeChannel = spy(new HelixGatewayServicePollModeChannel(manager, 
builder.build()));
+    pollModeChannel._scheduler = scheduler; // Inject the mocked scheduler
+
+    // Mock the necessary methods and data
+    
doReturn(true).when(pollModeChannel).fetchInstanceLivenessStatus("cluster1", 
"instance1");
+    Map<String, Map<String, Map<String, Map<String, String>>>> currentStateMap 
=
+        Map.of("cluster1", Map.of("instance1", Map.of("resource1", 
Map.of("shard", "ONLINE"))));
+    
doReturn(currentStateMap).when(pollModeChannel).getChangedParticipantsCurrentState(any());
+
+    // 1. Call fetch update for first time, verify we got a init connect event
+    pollModeChannel.fetchUpdates();
+
+    Assert.assertEquals(1, connectEventCount);
+
+    // 2. Change currentStateMap, Call fetch update for second time, verify we 
got an update event
+    Map<String, Map<String, Map<String, Map<String, String>>>> 
currentStateMap2 =
+        Map.of("cluster1", Map.of("instance1", Map.of("resource1", 
Map.of("shard", "OFFLINE"))));
+    
doReturn(currentStateMap2).when(pollModeChannel).getChangedParticipantsCurrentState(any());
+    pollModeChannel.fetchUpdates();
+    Assert.assertEquals(1, updateEventCount);
+
+    // call pne more time with same shard state, verify no new event
+    pollModeChannel.fetchUpdates();
+    Assert.assertEquals(1, updateEventCount);
+
+    // 3. Change health check result, Call fetch update for third time, verify 
we got a disconnect event
+    
doReturn(false).when(pollModeChannel).fetchInstanceLivenessStatus("cluster1", 
"instance1");
+    pollModeChannel.fetchUpdates();
+    Assert.assertEquals(1, disconnectEventCount);
+  }
+
+  class DummyGatewayServiceManager extends GatewayServiceManager {
+
+    public DummyGatewayServiceManager(GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
+      super("dummyZkAddress", gatewayServiceChannelConfig);
+    }
+
+    @Override
+    public void onGatewayServiceEvent(GatewayServiceEvent event) {
+      if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+        connectEventCount++;
+      } else if 
(event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) {
+        disconnectEventCount++;
+      } else if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+        updateEventCount++;
+      }
+      System.out.println("Received event: " + event.getEventType());
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index 69882fc66..75e9fc581 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -372,11 +372,11 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
     }
 
     @Override
-    public void closeConnectionWithError(String instanceName, String reason) {
+    public void closeConnectionWithError(String clusterName, String 
instanceName, String reason) {
       _errorDisconnectCount.incrementAndGet();
     }
     @Override
-    public void completeConnection(String instanceName) {
+    public void completeConnection(String clusterName, String instanceName) {
       _gracefulDisconnectCount.incrementAndGet();
     }
   }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java
similarity index 92%
rename from 
helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
rename to 
helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java
index 448e0b3e9..99fec4a25 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.utils;
+package org.apache.helix.gateway.util;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,7 +21,6 @@ package org.apache.helix.gateway.utils;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.helix.gateway.util.GatewayCurrentStateCache;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -77,6 +76,6 @@ public class TestGatewayCurrentStateCache {
     cache.updateTargetStateWithDiff("instance1", targetStateChange);
 
     Assert.assertEquals(cache.getTargetState("instance1", "resource1", 
"shard1"), "OFFLINE");
-    Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), 
"{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
+    
Assert.assertEquals(cache.serializeTargetAssignmentsToJSONNode().toString(), 
"{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
   }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java
similarity index 96%
rename from 
helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
rename to 
helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java
index 65c299122..d65031a5f 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java
@@ -1,4 +1,6 @@
-package org.apache.helix.gateway.utils;/*
+package org.apache.helix.gateway.util;
+
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +22,6 @@ package org.apache.helix.gateway.utils;/*
 import org.apache.helix.HelixDefinedState;
 
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
-import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;

Reply via email to