This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0efb309e8fec43299ac44f04f108d08bdff6b5f3
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Wed Jul 31 16:53:52 2024 -0700

    API to close grpc client stream connection from server side (#2856)
    
    API to close grpc client stream connection from server side
---
 .../org/apache/helix/gateway/HelixGatewayMain.java |  21 ++-
 .../api/service/HelixGatewayServiceProcessor.java  |  13 ++
 .../constant/GatewayServiceGrpcDefaultConfig.java  |   7 +
 .../HelixGatewayServiceGrpcService.java            |  43 ++++++
 .../gateway/service/GatewayServiceManager.java     |  12 +-
 .../util/HelixGatewayGrpcServerBuilder.java        |  99 ++++++++++++++
 .../java/org/apache/helix/gateway/DummyTest.java   |  21 ---
 .../helix/gateway/TestPerKeyBlockingExecutor.java  |   2 +-
 .../helix/gateway/TestPerKeyLockRegistry.java      |   2 +-
 .../helix/gateway/base/HelixGatewayTestBase.java   |   1 -
 .../helix/gateway/base/ZookeeperTestBase.java      |   3 +-
 .../helix/gateway/base/manager/ClusterManager.java |   4 -
 .../base/manager/MockParticipantManager.java       |   1 -
 .../apache/helix/gateway/base/util/TestHelper.java | 148 ++++++++++----------
 .../participant/TestHelixGatewayParticipant.java   |  10 ++
 .../service/TestGatewayServiceConnection.java      | 150 +++++++++++++++++++++
 .../gateway/service/TestGatewayServiceManager.java |   2 +-
 17 files changed, 424 insertions(+), 115 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
index a9245f157..bcf502e90 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -19,6 +19,14 @@ package org.apache.helix.gateway;
  * under the License.
  */
 
+import io.grpc.Server;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+
+
 /**
  * Main class for Helix Gateway.
  * It starts the Helix Gateway grpc service.
@@ -28,7 +36,18 @@ public final class HelixGatewayMain {
   private HelixGatewayMain() {
   }
 
-  public static void main(String[] args) throws InterruptedException {
+  public static void main(String[] args) throws InterruptedException, 
IOException {
+    // Create a new server to listen on port 50051
+    GatewayServiceManager manager = new GatewayServiceManager();
+    Server server = new HelixGatewayGrpcServerBuilder().setPort(50051)
+        
.setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor())
+        .build();
 
+    server.start();
+    System.out.println("Server started, listening on " + server.getPort());
+
+    // Wait for the server to shutdown
+    server.awaitTermination(365, TimeUnit.DAYS);
   }
 }
+
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index fe57e69c9..c06443802 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -37,4 +37,17 @@ public interface HelixGatewayServiceProcessor {
   void sendStateTransitionMessage(String instanceName, String currentState,
       Message message);
 
+  /**
+   * Close connection with error.
+   * @param instanceName  instance name
+   * @param reason  reason for closing connection
+   */
+  public void closeConnectionWithError(String instanceName, String reason);
+
+  /**
+   * Close connection with success.
+   * @param instanceName  instance name
+   */
+  public void completeConnection(String instanceName);
+
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
new file mode 100644
index 000000000..3f1bbafbc
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java
@@ -0,0 +1,7 @@
+package org.apache.helix.gateway.constant;
+
+public class GatewayServiceGrpcDefaultConfig {
+  public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60;
+  public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
+  public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 018d6591e..344a2649d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.grpcservice;
  * under the License.
  */
 
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,8 @@ import 
org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
 import org.apache.helix.gateway.util.PerKeyLockRegistry;
 import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
 import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -41,6 +44,8 @@ import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMe
  */
 public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
     implements HelixGatewayServiceProcessor {
+  // create LOGGER
+  private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);
 
   // Map to store the observer for each instance
   private final Map<String, StreamObserver<TransitionMessage>> _observerMap = 
new HashMap<>();
@@ -72,6 +77,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
 
       @Override
       public void onNext(ShardStateMessage request) {
+        logger.info("Receive message from instance: {}", request.toString());
         if (request.hasShardState()) {
           ShardState shardState = request.getShardState();
           updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
@@ -81,11 +87,13 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
 
       @Override
       public void onError(Throwable t) {
+        logger.info("Receive on error message: {}", t.getMessage());
         onClientClose(responseObserver);
       }
 
       @Override
       public void onCompleted() {
+        logger.info("Receive on complete message");
         onClientClose(responseObserver);
       }
     };
@@ -110,6 +118,40 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     }
   }
 
+  /**
+   * Close the connection of the instance. If closed because of error, use the 
error reason to close the connection.
+   * @param instanceName instance name
+   * @param errorReason   error reason for close
+   */
+  @Override
+  public void closeConnectionWithError(String instanceName, String 
errorReason) {
+    logger.info("Close connection for instance: {} with error reason: {}", 
instanceName,  errorReason);
+    closeConnectionHelper(instanceName, errorReason, true);
+  }
+
+  /**
+   * Complete the connection of the instance.
+   * @param instanceName instance name
+   */
+  @Override
+  public void completeConnection(String instanceName) {
+    logger.info("Complete connection for instance: {}", instanceName);
+    closeConnectionHelper(instanceName, null, false);
+  }
+
+
+  private void closeConnectionHelper(String instanceName, String errorReason, 
boolean withError) {
+    StreamObserver<TransitionMessage> observer;
+    observer = _observerMap.get(instanceName);
+    if (observer != null) {
+      if (withError) {
+        
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
+      } else {
+        observer.onCompleted();
+      }
+    }
+  }
+
   private void updateObserver(String instanceName, String clusterName,
       StreamObserver<TransitionMessage> streamObserver) {
     _lockRegistry.withLock(instanceName, () -> {
@@ -125,6 +167,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
     clusterName = instanceInfo.getRight();
     instanceName = instanceInfo.getLeft();
+    logger.info("Client close connection for instance: {}", instanceName);
 
     if (instanceName == null || clusterName == null) {
       // TODO: log error;
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 85a274156..4553c04ca 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -75,7 +75,7 @@ public class GatewayServiceManager {
     if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
       _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
     } else {
-      _connectionEventProcessor.offerEvent(event.getInstanceName(), new 
participantConnectionProcessor(event));
+      _connectionEventProcessor.offerEvent(event.getInstanceName(), new 
ParticipantConnectionProcessor(event));
     }
   }
 
@@ -109,10 +109,10 @@ public class GatewayServiceManager {
    * Create HelixGatewayService instance and register it to the manager.
    * It includes waiting for ZK connection, and also wait for previous 
LiveInstance to expire.
    */
-  class participantConnectionProcessor implements Runnable {
-    private final GatewayServiceEvent _event;
+  class ParticipantConnectionProcessor implements Runnable {
+    GatewayServiceEvent _event;
 
-    private participantConnectionProcessor(GatewayServiceEvent event) {
+    public ParticipantConnectionProcessor(GatewayServiceEvent event) {
       _event = event;
     }
 
@@ -127,6 +127,10 @@ public class GatewayServiceManager {
     }
   }
 
+  public HelixGatewayServiceProcessor getHelixGatewayServiceProcessor() {
+    return _gatewayServiceProcessor;
+  }
+
   private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
       Map<String, Map<String, String>> initialShardStateMap) {
     // Create and add the participant to the participant map
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
new file mode 100644
index 000000000..afa1477ce
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java
@@ -0,0 +1,99 @@
+package org.apache.helix.gateway.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.protobuf.services.ProtoReflectionService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
+
+
+/**
+   * Builder class to create a Helix gateway service server with custom 
configurations.
+   */
+  public  class HelixGatewayGrpcServerBuilder {
+    private int port;
+    private BindableService service;
+    private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL;
+    private int maxAllowedClientHeartBeatInterval = 
DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL;
+    private int clientTimeout = DEFAULT_CLIENT_TIMEOUT;
+    private boolean enableReflectionService = true;
+
+    public HelixGatewayGrpcServerBuilder setPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int 
serverHeartBeatInterval) {
+      this.serverHeartBeatInterval = serverHeartBeatInterval;
+      return this;
+    }
+
+    public HelixGatewayGrpcServerBuilder 
setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) {
+      this.maxAllowedClientHeartBeatInterval = 
maxAllowedClientHeartBeatInterval;
+      return this;
+    }
+
+    public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) {
+      this.clientTimeout = clientTimeout;
+      return this;
+    }
+
+    public HelixGatewayGrpcServerBuilder setGrpcService(BindableService 
service) {
+      this.service = service;
+      return this;
+    }
+
+    public HelixGatewayGrpcServerBuilder enableReflectionService(boolean 
enableReflectionService) {
+      this.enableReflectionService = enableReflectionService;
+      return this;
+    }
+
+    public Server build() {
+      validate();
+
+      ServerBuilder serverBuilder = ServerBuilder.forPort(port)
+          .addService(service)
+          .keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS)  // 
HeartBeat time
+          .keepAliveTimeout(clientTimeout, TimeUnit.SECONDS)  // KeepAlive 
client timeout
+          .permitKeepAliveTime(maxAllowedClientHeartBeatInterval, 
TimeUnit.SECONDS)  // Permit min HeartBeat time
+          .permitKeepAliveWithoutCalls(true);  // Allow KeepAlive forever 
without active RPCs
+
+      if (enableReflectionService) {
+        serverBuilder.addService(ProtoReflectionService.newInstance());
+      }
+
+      return serverBuilder
+          .build();
+    }
+
+    private void validate() {
+      if (port == 0 || service == null) {
+        throw new IllegalArgumentException("Port and service must be set");
+      }
+      if (clientTimeout < maxAllowedClientHeartBeatInterval) {
+        throw new IllegalArgumentException("Client timeout is less than max 
allowed client heartbeat interval");
+      }
+    }
+  }
+
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java 
b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
deleted file mode 100644
index 7511cb3a3..000000000
--- a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.helix.gateway;
-
-import org.apache.helix.gateway.base.HelixGatewayTestBase;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class DummyTest extends HelixGatewayTestBase {
-
-  @BeforeClass
-  public void beforeClass() {
-    _numParticipants = 5;
-    super.beforeClass();
-  }
-
-  @Test
-  public void testSetups() {
-    createResource("TestDB_1", 4, 2);
-    Assert.assertTrue(_clusterVerifier.verify());
-  }
-}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
index 457002cbd..c58a5bfa7 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
@@ -3,8 +3,8 @@ package org.apache.helix.gateway;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
-import org.testng.annotations.Test;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class TestPerKeyBlockingExecutor {
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
index 89d2e0bc1..3b90dcb10 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
@@ -5,8 +5,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.helix.gateway.util.PerKeyLockRegistry;
-import org.testng.annotations.Test;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class TestPerKeyLockRegistry {
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
index 06cf45cd1..5ee11e42b 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
@@ -1,7 +1,6 @@
 package org.apache.helix.gateway.base;
 
 import java.util.List;
-
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.gateway.base.manager.ClusterControllerManager;
 import org.apache.helix.gateway.base.manager.MockParticipantManager;
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
index 8d87bc4f3..ac09eb6ec 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
@@ -1,5 +1,6 @@
 package org.apache.helix.gateway.base;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
@@ -12,8 +13,6 @@ import java.util.Set;
 import java.util.logging.Level;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
-
-import com.google.common.base.Preconditions;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
index 3858513e5..2d0070ed0 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
@@ -19,16 +19,12 @@ package org.apache.helix.gateway.base.manager;
  * under the License.
  */
 
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.helix.HelixManagerProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
index 34047c525..625eb7452 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
@@ -20,7 +20,6 @@ package org.apache.helix.gateway.base.manager;
  */
 
 import java.util.concurrent.CountDownLatch;
-
 import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.HelixManagerProperty;
 import org.apache.helix.HelixPropertyFactory;
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
index 681f39205..a5fbcf39c 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.base.util;
  * under the License.
  */
 
+import io.grpc.Server;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -37,13 +38,15 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -71,10 +74,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
+import static 
org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
+
+
 public class TestHelper {
   private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
   public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
   public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500;
+
   /**
    * Returns a unused random port.
    */
@@ -92,22 +99,19 @@ public class TestHelper {
     return TestHelper.startZkServer(zkAddress, empty, true);
   }
 
-  static public ZkServer startZkServer(final String zkAddress, final String 
rootNamespace)
-      throws Exception {
+  static public ZkServer startZkServer(final String zkAddress, final String 
rootNamespace) throws Exception {
     List<String> rootNamespaces = new ArrayList<String>();
     rootNamespaces.add(rootNamespace);
     return TestHelper.startZkServer(zkAddress, rootNamespaces, true);
   }
 
-  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces)
-      throws Exception {
+  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces) throws Exception {
     return startZkServer(zkAddress, rootNamespaces, true);
   }
 
-  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces,
-      boolean overwrite) throws Exception {
-    System.out.println(
-        "Start zookeeper at " + zkAddress + " in thread " + 
Thread.currentThread().getName());
+  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces, boolean overwrite)
+      throws Exception {
+    System.out.println("Start zookeeper at " + zkAddress + " in thread " + 
Thread.currentThread().getName());
 
     String zkDir = zkAddress.replace(':', '_');
     final String logDir = "/tmp/" + zkDir + "/logs";
@@ -146,8 +150,7 @@ public class TestHelper {
     if (zkServer != null) {
       zkServer.shutdown();
       System.out.println(
-          "Shut down zookeeper at port " + zkServer.getPort() + " in thread " 
+ Thread
-              .currentThread().getName());
+          "Shut down zookeeper at port " + zkServer.getPort() + " in thread " 
+ Thread.currentThread().getName());
     }
   }
 
@@ -190,8 +193,7 @@ public class TestHelper {
       // debug
       // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify 
("
       // + result + ")");
-      System.err.println(
-          verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" 
+ result + ")");
+      System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to 
verify " + " (" + result + ")");
       LOG.debug("args:" + Arrays.toString(args));
       // System.err.println("args:" + Arrays.toString(args));
 
@@ -218,21 +220,19 @@ public class TestHelper {
 
   public static boolean verifyEmptyCurStateAndExtView(String clusterName, 
String resourceName,
       Set<String> instanceNames, String zkAddr) {
-    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    HelixZkClient zkClient =
+        SharedZkClientFactory.getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
-      ZKHelixDataAccessor accessor =
-          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
       Builder keyBuilder = accessor.keyBuilder();
 
       for (String instanceName : instanceNames) {
         List<String> sessionIds = 
accessor.getChildNames(keyBuilder.sessions(instanceName));
 
         for (String sessionId : sessionIds) {
-          CurrentState curState =
-              accessor.getProperty(keyBuilder.currentState(instanceName, 
sessionId, resourceName));
+          CurrentState curState = 
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, 
resourceName));
 
           if (curState != null && curState.getRecord().getMapFields().size() 
!= 0) {
             return false;
@@ -263,20 +263,18 @@ public class TestHelper {
     return !manager.isConnected();
   }
 
-  public static void setupCluster(String clusterName, String zkAddr, int 
startPort,
-      String participantNamePrefix, String resourceNamePrefix, int resourceNb, 
int partitionNb,
-      int nodesNb, int replica, String stateModelDef, boolean doRebalance) 
throws Exception {
-    TestHelper
-        .setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, 
resourceNamePrefix,
-            resourceNb, partitionNb, nodesNb, replica, stateModelDef, 
RebalanceMode.SEMI_AUTO,
-            doRebalance);
+  public static void setupCluster(String clusterName, String zkAddr, int 
startPort, String participantNamePrefix,
+      String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, 
int replica, String stateModelDef,
+      boolean doRebalance) throws Exception {
+    TestHelper.setupCluster(clusterName, zkAddr, startPort, 
participantNamePrefix, resourceNamePrefix, resourceNb,
+        partitionNb, nodesNb, replica, stateModelDef, RebalanceMode.SEMI_AUTO, 
doRebalance);
   }
 
-  public static void setupCluster(String clusterName, String zkAddr, int 
startPort,
-      String participantNamePrefix, String resourceNamePrefix, int resourceNb, 
int partitionNb,
-      int nodesNb, int replica, String stateModelDef, RebalanceMode mode, 
boolean doRebalance) {
-    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+  public static void setupCluster(String clusterName, String zkAddr, int 
startPort, String participantNamePrefix,
+      String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, 
int replica, String stateModelDef,
+      RebalanceMode mode, boolean doRebalance) {
+    HelixZkClient zkClient =
+        SharedZkClientFactory.getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(zkAddr));
     try {
       if (zkClient.exists("/" + clusterName)) {
         LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
@@ -293,8 +291,7 @@ public class TestHelper {
 
       for (int i = 0; i < resourceNb; i++) {
         String resourceName = resourceNamePrefix + i;
-        setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, 
stateModelDef,
-            mode.name(),
+        setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, 
stateModelDef, mode.name(),
             mode == RebalanceMode.FULL_AUTO ? 
CrushEdRebalanceStrategy.class.getName()
                 : RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY);
         if (doRebalance) {
@@ -334,15 +331,13 @@ public class TestHelper {
    * @param state
    *          : MASTER|SLAVE|ERROR...
    */
-  public static void verifyState(String clusterName, String zkAddr,
-      Map<String, Set<String>> stateMap, String state) {
-    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+  public static void verifyState(String clusterName, String zkAddr, 
Map<String, Set<String>> stateMap, String state) {
+    HelixZkClient zkClient =
+        SharedZkClientFactory.getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
-      ZKHelixDataAccessor accessor =
-          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
       Builder keyBuilder = accessor.keyBuilder();
 
       for (String resGroupPartitionKey : stateMap.keySet()) {
@@ -354,12 +349,12 @@ public class TestHelper {
         for (String instance : stateMap.get(resGroupPartitionKey)) {
           String actualState = extView.getStateMap(partitionKey).get(instance);
           Assert.assertNotNull(actualState,
-              "externalView doesn't contain state for " + resGroup + "/" + 
partitionKey + " on "
-                  + instance + " (expect " + state + ")");
+              "externalView doesn't contain state for " + resGroup + "/" + 
partitionKey + " on " + instance
+                  + " (expect " + state + ")");
 
           Assert.assertEquals(actualState, state,
-              "externalView for " + resGroup + "/" + partitionKey + " on " + 
instance + " is "
-                  + actualState + " (expect " + state + ")");
+              "externalView for " + resGroup + "/" + partitionKey + " on " + 
instance + " is " + actualState
+                  + " (expect " + state + ")");
         }
       }
     } finally {
@@ -391,8 +386,8 @@ public class TestHelper {
     return retMap;
   }
 
-  public static <T> Map<String, T> startThreadsConcurrently(final int 
nrThreads,
-      final Callable<T> method, final long timeout) {
+  public static <T> Map<String, T> startThreadsConcurrently(final int 
nrThreads, final Callable<T> method,
+      final long timeout) {
     final CountDownLatch startLatch = new CountDownLatch(1);
     final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
     final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
@@ -445,8 +440,8 @@ public class TestHelper {
     return resultsMap;
   }
 
-  public static Message createMessage(String msgId, String fromState, String 
toState,
-      String tgtName, String resourceName, String partitionName) {
+  public static Message createMessage(String msgId, String fromState, String 
toState, String tgtName,
+      String resourceName, String partitionName) {
     Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
     msg.setFromState(fromState);
     msg.setToState(toState);
@@ -469,8 +464,7 @@ public class TestHelper {
     return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
   }
 
-  public static <T> Map<String, T> startThreadsConcurrently(final 
List<Callable<T>> methods,
-      final long timeout) {
+  public static <T> Map<String, T> startThreadsConcurrently(final 
List<Callable<T>> methods, final long timeout) {
     final int nrThreads = methods.size();
     final CountDownLatch startLatch = new CountDownLatch(1);
     final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
@@ -536,8 +530,7 @@ public class TestHelper {
       TreeSet<String> childSet = new TreeSet<String>();
       childSet.addAll(node.getChildSet());
       System.out.print(
-          key + "=" + node.getData() + ", " + childSet + ", " + 
(node.getStat() == null ? "null\n"
-              : node.getStat()));
+          key + "=" + node.getData() + ", " + childSet + ", " + 
(node.getStat() == null ? "null\n" : node.getStat()));
     }
     System.out.println("END:Print cache");
   }
@@ -560,8 +553,7 @@ public class TestHelper {
     }
   }
 
-  public static void readZkRecursive(String path, Map<String, ZNode> map,
-      BaseDataAccessor<ZNRecord> zkAccessor) {
+  public static void readZkRecursive(String path, Map<String, ZNode> map, 
BaseDataAccessor<ZNRecord> zkAccessor) {
     try {
       Stat stat = new Stat();
       ZNRecord record = zkAccessor.get(path, stat, 0);
@@ -582,8 +574,8 @@ public class TestHelper {
     }
   }
 
-  public static boolean verifyZkCache(List<String> paths, 
BaseDataAccessor<ZNRecord> zkAccessor,
-      HelixZkClient zkclient, boolean needVerifyStat) {
+  public static boolean verifyZkCache(List<String> paths, 
BaseDataAccessor<ZNRecord> zkAccessor, HelixZkClient zkclient,
+      boolean needVerifyStat) {
     // read everything
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     Map<String, ZNode> cache = new HashMap<String, ZNode>();
@@ -596,13 +588,13 @@ public class TestHelper {
     return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
   }
 
-  public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> 
cache,
-      HelixZkClient zkclient, boolean needVerifyStat) {
+  public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> 
cache, HelixZkClient zkclient,
+      boolean needVerifyStat) {
     return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
   }
 
-  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat,
-      Map<String, ZNode> cache, HelixZkClient zkclient, boolean 
needVerifyStat) {
+  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat, Map<String, ZNode> cache,
+      HelixZkClient zkclient, boolean needVerifyStat) {
     // read everything on zk under paths
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     for (String path : paths) {
@@ -613,12 +605,11 @@ public class TestHelper {
     return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, 
needVerifyStat);
   }
 
-  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat,
-      Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean 
needVerifyStat) {
+  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat, Map<String, ZNode> cache,
+      Map<String, ZNode> zkMap, boolean needVerifyStat) {
     // equal size
     if (zkMap.size() != cache.size()) {
-      System.err
-          .println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: 
" + zkMap.size());
+      System.err.println("size mismatch: cacheSize: " + cache.size() + ", 
zkMapSize: " + zkMap.size());
       System.out.println("cache: (" + cache.size() + ")");
       TestHelper.printCache(cache);
 
@@ -640,23 +631,20 @@ public class TestHelper {
       }
 
       if ((zkNode.getData() == null && cacheNode.getData() != null) || 
(zkNode.getData() != null
-          && cacheNode.getData() == null) || (zkNode.getData() != null
-          && cacheNode.getData() != null && 
!zkNode.getData().equals(cacheNode.getData()))) {
+          && cacheNode.getData() == null) || (zkNode.getData() != null && 
cacheNode.getData() != null
+          && !zkNode.getData().equals(cacheNode.getData()))) {
         // data not equal
         System.err.println(
-            "data mismatch on path: " + path + ", inCache: " + 
cacheNode.getData() + ", onZk: "
-                + zkNode.getData());
+            "data mismatch on path: " + path + ", inCache: " + 
cacheNode.getData() + ", onZk: " + zkNode.getData());
         return false;
       }
 
-      if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || 
(
-          zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || (
-          zkNode.getChildSet() != null && cacheNode.getChildSet() != null && 
!zkNode.getChildSet()
-              .equals(cacheNode.getChildSet()))) {
+      if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || 
(zkNode.getChildSet() != null
+          && cacheNode.getChildSet() == null) || (zkNode.getChildSet() != null 
&& cacheNode.getChildSet() != null
+          && !zkNode.getChildSet().equals(cacheNode.getChildSet()))) {
         // childSet not equal
-        System.err.println(
-            "childSet mismatch on path: " + path + ", inCache: " + 
cacheNode.getChildSet()
-                + ", onZk: " + zkNode.getChildSet());
+        System.err.println("childSet mismatch on path: " + path + ", inCache: 
" + cacheNode.getChildSet() + ", onZk: "
+            + zkNode.getChildSet());
         return false;
       }
 
@@ -664,8 +652,7 @@ public class TestHelper {
         if (cacheNode.getStat() == null || 
!zkNode.getStat().equals(cacheNode.getStat())) {
           // stat not equal
           System.err.println(
-              "Stat mismatch on path: " + path + ", inCache: " + 
cacheNode.getStat() + ", onZk: "
-                  + zkNode.getStat());
+              "Stat mismatch on path: " + path + ", inCache: " + 
cacheNode.getStat() + ", onZk: " + zkNode.getStat());
           return false;
         }
       }
@@ -684,8 +671,7 @@ public class TestHelper {
     statePriorityList.add("IDLE");
     statePriorityList.add("DROPPED");
     statePriorityList.add("ERROR");
-    
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
-        statePriorityList);
+    
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
 statePriorityList);
     for (String state : statePriorityList) {
       String key = state + ".meta";
       Map<String, String> metadata = new HashMap<String, String>();
@@ -814,4 +800,10 @@ public class TestHelper {
       Thread.sleep(50);
     } while (true);
   }
+
+  public static Server createHelixGatewayServer(int port, 
GatewayServiceManager manager) {
+    return new HelixGatewayGrpcServerBuilder().setPort(port)
+        .setGrpcService((HelixGatewayServiceGrpcService) 
manager.getHelixGatewayServiceProcessor())
+        .build();
+  }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index fda7fbb1b..dd22b9fa0 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -329,5 +329,15 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
         Message message) {
       _pendingMessageMap.put(instanceName, message);
     }
+
+    @Override
+    public void closeConnectionWithError(String instanceName, String reason) {
+
+    }
+
+    @Override
+    public void completeConnection(String instanceName) {
+
+    }
   }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
new file mode 100644
index 000000000..dac903b28
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -0,0 +1,150 @@
+package org.apache.helix.gateway.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.gateway.base.HelixGatewayTestBase;
+import org.apache.helix.gateway.base.util.TestHelper;
+import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+
+
+public class TestGatewayServiceConnection extends HelixGatewayTestBase {
+  CountDownLatch connectLatch = new CountDownLatch(1);
+  CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+  @Test
+  public void TestLivenessDetection() throws IOException, InterruptedException 
{
+    // start the gateway service
+    Server server = TestHelper.createHelixGatewayServer(50051, new 
DummyGatewayServiceManager());
+    server.start();
+
+    // start the client
+    HelixGatewayClient client = new HelixGatewayClient("localhost", 50051);
+    client.connect();
+    // assert we get connect event
+    Assert.assertTrue(connectLatch.await(5, TimeUnit.SECONDS));
+
+    // assert we have disconnect event
+    client.shutdownGracefully();
+    Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS));
+
+    connectLatch = new CountDownLatch(1);
+    disconnectLatch = new CountDownLatch(1);
+
+    // start the client
+    HelixGatewayClient client2 = new HelixGatewayClient("localhost", 50051);
+    client2.connect();
+    // assert we get connect event
+    Assert.assertTrue(connectLatch.await(5, TimeUnit.SECONDS));
+
+    // assert we have disconnect event when shutting down ungracefully
+    client2.shutdownByClosingConnection();
+    Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS));
+  }
+
+  public class HelixGatewayClient {
+
+    private final ManagedChannel channel;
+    private final HelixGatewayServiceGrpc.HelixGatewayServiceStub asyncStub;
+
+    private StreamObserver _requestObserver;
+
+    public HelixGatewayClient(String host, int port) {
+      this.channel = ManagedChannelBuilder.forAddress(host, port)
+          .usePlaintext()
+          .keepAliveTime(30, TimeUnit.SECONDS)  // KeepAlive time
+          .keepAliveTimeout(3, TimeUnit.MINUTES)  // KeepAlive timeout
+          .keepAliveWithoutCalls(true)  // Allow KeepAlive without active RPCs
+          .build();
+      this.asyncStub = HelixGatewayServiceGrpc.newStub(channel);
+    }
+
+    public void connect() {
+      _requestObserver = asyncStub.report(new 
StreamObserver<HelixGatewayServiceOuterClass.TransitionMessage>() {
+        @Override
+        public void onNext(HelixGatewayServiceOuterClass.TransitionMessage 
value) {
+          // Handle response from server
+        }
+
+        @Override
+        public void onError(Throwable t) {
+          // Handle error
+        }
+
+        @Override
+        public void onCompleted() {
+          // Handle stream completion
+        }
+      });
+
+      // Send initial ShardStateMessage
+      HelixGatewayServiceOuterClass.ShardStateMessage initialMessage =
+          HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder()
+              
.setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder()
+                  .setInstanceName("instance1")
+                  .setClusterName("TEST_CLUSTER")
+                  .build())
+              .build();
+      _requestObserver.onNext(initialMessage);
+
+      // Add more logic to send additional messages if needed
+    }
+
+    public void shutdownGracefully() {
+      // graceful shutdown
+      _requestObserver.onCompleted();
+      channel.shutdown().shutdownNow();
+    }
+
+    public void shutdownByClosingConnection() {
+      //ungraceful shutdown
+      channel.shutdown().shutdownNow();
+    }
+  }
+
+  class DummyGatewayServiceManager extends GatewayServiceManager {
+
+    public DummyGatewayServiceManager() {
+      super();
+    }
+
+    @Override
+    public void newGatewayServiceEvent(GatewayServiceEvent event) {
+      if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
+        connectLatch.countDown();
+      } else if 
(event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) {
+        disconnectLatch.countDown();
+      }
+      System.out.println("Received event: " + event.getEventType());
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index a345f008e..788bf4472 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -4,7 +4,7 @@ import 
org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
 


Reply via email to