This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fe3e0726e45ad15d888cb0b0453a0ca6072f0c8d
Author: xyuanlu <[email protected]>
AuthorDate: Thu Aug 8 15:19:14 2024 -0700

    Refine gateway service interface (#2875)
    
    Refine gateway service interface
---
 .../org/apache/helix/gateway/HelixGatewayMain.java |  2 +-
 ...onitor.java => HelixGatewayServiceChannel.java} | 44 +++++++++++++++-----
 .../api/service/HelixGatewayServiceProcessor.java  | 47 ----------------------
 .../HelixGatewayServiceShardStateProcessor.java    | 34 ----------------
 .../HelixGatewayServiceGrpcService.java            | 11 +++--
 .../HelixGatewayServicePullModeChannel.java        | 25 ++++++++++++
 .../participant/HelixGatewayParticipant.java       | 22 +++++-----
 .../gateway/service/GatewayServiceManager.java     | 16 ++++----
 .../service/GatewayServiceManagerFactory.java      | 29 -------------
 .../apache/helix/gateway/base/util/TestHelper.java |  4 +-
 .../participant/TestHelixGatewayParticipant.java   | 22 ++++------
 .../service/TestGatewayServiceConnection.java      |  2 +-
 .../gateway/service/TestGatewayServiceManager.java |  4 +-
 13 files changed, 96 insertions(+), 166 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
index bcf502e90..b4b8ca0cb 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -22,7 +22,7 @@ package org.apache.helix.gateway;
 import io.grpc.Server;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.apache.helix.gateway.service.GatewayServiceManager;
 import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
similarity index 51%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
index 0f547354a..1731e158f 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
@@ -19,10 +19,42 @@ package org.apache.helix.gateway.api.service;
  * under the License.
  */
 
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.model.Message;
+
+
 /**
- * Interface for gateway manager to interact with clients on connection.
+ * Helix Gateway Service channel interface provides API for inbound and 
outbound communication between
+ * Gateway service and application instances.
  */
-public interface HelixGatewayServiceClientConnectionMonitor {
+public interface HelixGatewayServiceChannel {
+
+  /**
+   * Gateway service send a state transition message to a connected 
participant.
+   *
+   * @param instanceName the name of the participant
+   * @param currentState the current state of the shard
+   * @param message      the message to send
+   */
+  void sendStateTransitionMessage(String instanceName, String currentState, 
Message message);
+
+  /**
+   * Send a GatewayServiceEvent to gateway manager for helix instances changes.
+   * Event could be a connection closed event (event type DISCONNECT),
+   * an initial connection establish event that contains a map of current 
chard states (event type CONNECT),
+   * or a state transition result message (event type UPDATE).
+   *
+   * The default implementation push an event to the Gateway Service Manager.
+   *
+   * @param gatewayServiceManager the Gateway Service Manager
+   * @param event the event to push
+   */
+  default void pushClientEventToGatewayManager(GatewayServiceManager 
gatewayServiceManager, GatewayServiceEvent event) {
+    gatewayServiceManager.onGatewayServiceEvent(event);
+  }
+
+  // TODO: remove the following 2 apis in future changes
   /**
    * Gateway service close connection with error. This function is called when 
manager wants to close client
    * connection when there is an error. e.g. HelixManager connection is lost.
@@ -37,12 +69,4 @@ public interface HelixGatewayServiceClientConnectionMonitor {
    * @param instanceName  instance name
    */
   public void completeConnection(String instanceName);
-
-  /**
-   * Callback when we detect client connection is closed. It could be when 
client gracefully close the connection,
-   * or when client connection is timed out.
-   * @param clusterName  cluster name
-   * @param instanceName  instance name
-   */
-  public void onClientClose(String clusterName, String instanceName);
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
deleted file mode 100644
index 3ca7aeac5..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.helix.gateway.api.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.gateway.service.GatewayServiceEvent;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-
-
-/**
- * Helix Gateway Service Processor interface allows sending state transition 
messages to
- * participants through service implementing this interface.
- */
-public interface HelixGatewayServiceProcessor
-    extends HelixGatewayServiceClientConnectionMonitor, 
HelixGatewayServiceShardStateProcessor {
-
-  /**
-   * Callback when receiving a client event.
-   * Event could be a connection closed event (event type DISCONNECT),
-   * an initial connection establish event that contains a map of current 
chard states (event type CONNECT),
-   * or a state transition result message (event type UPDATE).
-   *
-   * The default implementation push an event to the Gateway Service Manager.
-   *
-   * @param gatewayServiceManager the Gateway Service Manager
-   * @param event the event to push
-   */
-  default void onClientEvent(GatewayServiceManager gatewayServiceManager, 
GatewayServiceEvent event) {
-    gatewayServiceManager.newGatewayServiceEvent(event);
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
deleted file mode 100644
index fb9bd6294..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.helix.gateway.api.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.model.Message;
-
-
-public interface HelixGatewayServiceShardStateProcessor {
-  /**
-   * Gateway service send a state transition message to a connected 
participant.
-   *
-   * @param instanceName the name of the participant
-   * @param currentState the current state of the shard
-   * @param message      the message to send
-   */
-  void sendStateTransitionMessage(String instanceName, String currentState, 
Message message);
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
similarity index 96%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index 3291e48b4..a176ca89c 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.grpcservice;
+package org.apache.helix.gateway.channel;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.util.PerKeyLockRegistry;
 import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
 import org.apache.helix.model.Message;
@@ -43,7 +43,7 @@ import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMe
  * Helix Gateway Service GRPC UI implementation.
  */
 public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
-    implements HelixGatewayServiceProcessor {
+    implements HelixGatewayServiceChannel {
   // create LOGGER
   private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);
 
@@ -83,7 +83,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
           ShardState shardState = request.getShardState();
           updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
         }
-        onClientEvent(_manager,
+        pushClientEventToGatewayManager(_manager,
             
StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
       }
 
@@ -153,7 +153,6 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     }
   }
 
-  @Override
   public void onClientClose(String clusterName, String instanceName) {
     if (instanceName == null || clusterName == null) {
       // TODO: log error;
@@ -162,7 +161,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     logger.info("Client close connection for instance: {}", instanceName);
     GatewayServiceEvent event =
         
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, 
instanceName);
-    onClientEvent(_manager, event);
+    pushClientEventToGatewayManager(_manager, event);
     _lockRegistry.withLock(instanceName, () -> {
       _reversedObserverMap.remove(_observerMap.get(instanceName));
       _observerMap.remove(instanceName);
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
new file mode 100644
index 000000000..0d80a96d0
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java
@@ -0,0 +1,25 @@
+package org.apache.helix.gateway.channel;
+
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.model.Message;
+
+
+public class HelixGatewayServicePullModeChannel implements 
HelixGatewayServiceChannel {
+
+    public HelixGatewayServicePullModeChannel() {
+    }
+  @Override
+  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
+
+  }
+
+  @Override
+  public void closeConnectionWithError(String instanceName, String reason) {
+
+  }
+
+  @Override
+  public void completeConnection(String instanceName) {
+
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index 96a39bb01..d60905d3c 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
 import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
@@ -45,16 +45,16 @@ import 
org.apache.helix.participant.statemachine.StateTransitionError;
  */
 public class HelixGatewayParticipant implements HelixManagerStateListener {
   public static final String UNASSIGNED_STATE = "UNASSIGNED";
-  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixGatewayServiceChannel _gatewayServiceChannel;
   private final HelixManager _helixManager;
   private final Runnable _onDisconnectedCallback;
   private final Map<String, Map<String, String>> _shardStateMap;
   private final Map<String, CompletableFuture<Boolean>> 
_stateTransitionResultMap;
 
-  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+  private HelixGatewayParticipant(HelixGatewayServiceChannel 
gatewayServiceChannel,
       Runnable onDisconnectedCallback, HelixManager helixManager,
       Map<String, Map<String, String>> initialShardStateMap) {
-    _gatewayServiceProcessor = gatewayServiceProcessor;
+    _gatewayServiceChannel = gatewayServiceChannel;
     _helixManager = helixManager;
     _onDisconnectedCallback = onDisconnectedCallback;
     _shardStateMap = initialShardStateMap;
@@ -74,7 +74,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
 
       CompletableFuture<Boolean> future = new CompletableFuture<>();
       _stateTransitionResultMap.put(transitionId, future);
-      
_gatewayServiceProcessor.sendStateTransitionMessage(_helixManager.getInstanceName(),
+      
_gatewayServiceChannel.sendStateTransitionMessage(_helixManager.getInstanceName(),
           getCurrentState(resourceId, shardId), message);
 
       if (!future.get()) {
@@ -183,7 +183,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
   @Override
   public void onDisconnected(HelixManager helixManager, Throwable error) 
throws Exception {
     _onDisconnectedCallback.run();
-    
_gatewayServiceProcessor.closeConnectionWithError(_helixManager.getInstanceName(),
+    
_gatewayServiceChannel.closeConnectionWithError(_helixManager.getInstanceName(),
         error.getMessage());
   }
 
@@ -191,11 +191,11 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
     if (_helixManager.isConnected()) {
       _helixManager.disconnect();
     }
-    
_gatewayServiceProcessor.completeConnection(_helixManager.getInstanceName());
+    _gatewayServiceChannel.completeConnection(_helixManager.getInstanceName());
   }
 
   public static class Builder {
-    private final HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
+    private final HelixGatewayServiceChannel _helixGatewayServiceChannel;
     private final String _instanceName;
     private final String _clusterName;
     private final String _zkAddress;
@@ -203,9 +203,9 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
     private final List<String> _multiTopStateModelDefinitions;
     private final Map<String, Map<String, String>> _initialShardStateMap;
 
-    public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, 
String instanceName,
+    public Builder(HelixGatewayServiceChannel helixGatewayServiceChannel, 
String instanceName,
         String clusterName, String zkAddress, Runnable onDisconnectedCallback) 
{
-      _helixGatewayServiceProcessor = helixGatewayServiceProcessor;
+      _helixGatewayServiceChannel = helixGatewayServiceChannel;
       _instanceName = instanceName;
       _clusterName = clusterName;
       _zkAddress = zkAddress;
@@ -258,7 +258,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
       HelixManager participantManager =
           new ZKHelixManager(_clusterName, _instanceName, 
InstanceType.PARTICIPANT, _zkAddress);
       HelixGatewayParticipant participant =
-          new HelixGatewayParticipant(_helixGatewayServiceProcessor, 
_onDisconnectedCallback,
+          new HelixGatewayParticipant(_helixGatewayServiceChannel, 
_onDisconnectedCallback,
               participantManager,
               _initialShardStateMap);
       _multiTopStateModelDefinitions.forEach(
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index fd7420735..2b626d395 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -26,9 +26,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import com.google.common.collect.ImmutableSet;
-import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.constant.GatewayServiceEventType;
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
@@ -51,7 +51,7 @@ public class GatewayServiceManager {
   private final ExecutorService _participantStateTransitionResultUpdator;
 
   // link to grpc service
-  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixGatewayServiceChannel _gatewayServiceChannel;
 
   // a per key executor for connection event. All event for the same instance 
will be executed in sequence.
   // It is used to ensure for each instance, the connect/disconnect event 
won't start until the previous one is done.
@@ -61,7 +61,7 @@ public class GatewayServiceManager {
     _helixGatewayParticipantMap = new ConcurrentHashMap<>();
     _zkAddress = "foo";
     _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
-    _gatewayServiceProcessor = new HelixGatewayServiceGrpcService(this);
+    _gatewayServiceChannel = new HelixGatewayServiceGrpcService(this);
     _connectionEventProcessor =
         new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
   }
@@ -71,7 +71,7 @@ public class GatewayServiceManager {
    *
    * @param event
    */
-  public void newGatewayServiceEvent(GatewayServiceEvent event) {
+  public void onGatewayServiceEvent(GatewayServiceEvent event) {
     if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
       _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
     } else {
@@ -127,15 +127,15 @@ public class GatewayServiceManager {
     }
   }
 
-  public HelixGatewayServiceProcessor getHelixGatewayServiceProcessor() {
-    return _gatewayServiceProcessor;
+  public HelixGatewayServiceChannel getHelixGatewayServiceProcessor() {
+    return _gatewayServiceChannel;
   }
 
   private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
       Map<String, Map<String, String>> initialShardStateMap) {
     // Create and add the participant to the participant map
     HelixGatewayParticipant.Builder participantBuilder =
-        new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, 
instanceName, clusterName,
+        new HelixGatewayParticipant.Builder(_gatewayServiceChannel, 
instanceName, clusterName,
             _zkAddress,
             () -> removeHelixGatewayParticipant(clusterName, 
instanceName)).setInitialShardState(
             initialShardStateMap);
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
deleted file mode 100644
index aed7518b9..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.helix.gateway.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Factory class to create GatewayServiceManager
- */
-public class GatewayServiceManagerFactory {
-  public GatewayServiceManager createGatewayServiceManager() {
-    return new GatewayServiceManager();
-  }
-}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
index a5fbcf39c..15b95448f 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -44,7 +44,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.apache.helix.gateway.service.GatewayServiceManager;
 import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -74,8 +74,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
-import static 
org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*;
-
 
 public class TestHelper {
   private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index 0c81ac354..d3180a16c 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
@@ -99,7 +99,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase {
   private HelixGatewayParticipant addParticipant(String participantName,
       Map<String, Map<String, String>> initialShardMap) {
     HelixGatewayParticipant participant =
-        new HelixGatewayParticipant.Builder(new 
MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName,
+        new HelixGatewayParticipant.Builder(new 
MockHelixGatewayServiceChannel(_pendingMessageMap), participantName,
             CLUSTER_NAME, ZK_ADDR, 
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
             TEST_STATE_MODEL).setInitialShardState(initialShardMap).build();
     _participants.add(participant);
@@ -319,17 +319,17 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
 
   @Test(dependsOnMethods = 
"testProcessStateTransitionAfterReconnectAfterDroppingPartition")
   public void testGatewayParticipantDisconnectGracefully() {
-    int gracefulDisconnectCount = 
MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get();
+    int gracefulDisconnectCount = 
MockHelixGatewayServiceChannel._gracefulDisconnectCount.get();
     // Remove the first participant
     HelixGatewayParticipant participant = _participants.get(0);
     deleteParticipant(participant);
 
-    
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
 gracefulDisconnectCount + 1);
+    
Assert.assertEquals(MockHelixGatewayServiceChannel._gracefulDisconnectCount.get(),
 gracefulDisconnectCount + 1);
   }
 
   @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully")
   public void testGatewayParticipantDisconnectWithError() throws Exception {
-    int errorDisconnectCount = 
MockHelixGatewayServiceProcessor._errorDisconnectCount.get();
+    int errorDisconnectCount = 
MockHelixGatewayServiceChannel._errorDisconnectCount.get();
     int onDisconnectCallbackCount = _onDisconnectCallbackCount.get();
 
     // Call on disconnect with error for all participants
@@ -337,17 +337,17 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
       participant.onDisconnected(null, new Exception("Test error"));
     }
 
-    
Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(),
+    
Assert.assertEquals(MockHelixGatewayServiceChannel._errorDisconnectCount.get(),
         errorDisconnectCount + _participants.size());
     Assert.assertEquals(_onDisconnectCallbackCount.get(), 
onDisconnectCallbackCount + _participants.size());
   }
 
-  public static class MockHelixGatewayServiceProcessor implements 
HelixGatewayServiceProcessor {
+  public static class MockHelixGatewayServiceChannel implements 
HelixGatewayServiceChannel {
     private final Map<String, Message> _pendingMessageMap;
     private static final AtomicInteger _gracefulDisconnectCount = new 
AtomicInteger();
     private static final AtomicInteger _errorDisconnectCount = new 
AtomicInteger();
 
-    public MockHelixGatewayServiceProcessor(Map<String, Message> 
pendingMessageMap) {
+    public MockHelixGatewayServiceChannel(Map<String, Message> 
pendingMessageMap) {
       _pendingMessageMap = pendingMessageMap;
     }
 
@@ -360,15 +360,9 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
     public void closeConnectionWithError(String instanceName, String reason) {
       _errorDisconnectCount.incrementAndGet();
     }
-
     @Override
     public void completeConnection(String instanceName) {
       _gracefulDisconnectCount.incrementAndGet();
     }
-
-    @Override
-    public void onClientClose(String clusterName, String instanceName) {
-
-    }
   }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index dac903b28..627c572e9 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -138,7 +138,7 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
     }
 
     @Override
-    public void newGatewayServiceEvent(GatewayServiceEvent event) {
+    public void onGatewayServiceEvent(GatewayServiceEvent event) {
       if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
         connectLatch.countDown();
       } else if 
(event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) {
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 788bf4472..873841cae 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -1,6 +1,6 @@
 package org.apache.helix.gateway.service;
 
-import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 
@@ -41,6 +41,6 @@ public class TestGatewayServiceManager {
     // Process disconnection event
     grpcService.report(null).onNext(disconnectionEvent);
     // Verify the events were processed in sequence
-    verify(manager, times(2)).newGatewayServiceEvent(any());
+    verify(manager, times(2)).onGatewayServiceEvent(any());
   }
 }

Reply via email to