This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit fe3e0726e45ad15d888cb0b0453a0ca6072f0c8d Author: xyuanlu <[email protected]> AuthorDate: Thu Aug 8 15:19:14 2024 -0700 Refine gateway service interface (#2875) Refine gateway service interface --- .../org/apache/helix/gateway/HelixGatewayMain.java | 2 +- ...onitor.java => HelixGatewayServiceChannel.java} | 44 +++++++++++++++----- .../api/service/HelixGatewayServiceProcessor.java | 47 ---------------------- .../HelixGatewayServiceShardStateProcessor.java | 34 ---------------- .../HelixGatewayServiceGrpcService.java | 11 +++-- .../HelixGatewayServicePullModeChannel.java | 25 ++++++++++++ .../participant/HelixGatewayParticipant.java | 22 +++++----- .../gateway/service/GatewayServiceManager.java | 16 ++++---- .../service/GatewayServiceManagerFactory.java | 29 ------------- .../apache/helix/gateway/base/util/TestHelper.java | 4 +- .../participant/TestHelixGatewayParticipant.java | 22 ++++------ .../service/TestGatewayServiceConnection.java | 2 +- .../gateway/service/TestGatewayServiceManager.java | 4 +- 13 files changed, 96 insertions(+), 166 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java index bcf502e90..b4b8ca0cb 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java @@ -22,7 +22,7 @@ package org.apache.helix.gateway; import io.grpc.Server; import java.io.IOException; import java.util.concurrent.TimeUnit; -import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java similarity index 51% rename from helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java index 0f547354a..1731e158f 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java @@ -19,10 +19,42 @@ package org.apache.helix.gateway.api.service; * under the License. */ +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.model.Message; + + /** - * Interface for gateway manager to interact with clients on connection. + * Helix Gateway Service channel interface provides API for inbound and outbound communication between + * Gateway service and application instances. */ -public interface HelixGatewayServiceClientConnectionMonitor { +public interface HelixGatewayServiceChannel { + + /** + * Gateway service send a state transition message to a connected participant. + * + * @param instanceName the name of the participant + * @param currentState the current state of the shard + * @param message the message to send + */ + void sendStateTransitionMessage(String instanceName, String currentState, Message message); + + /** + * Send a GatewayServiceEvent to gateway manager for helix instances changes. + * Event could be a connection closed event (event type DISCONNECT), + * an initial connection establish event that contains a map of current chard states (event type CONNECT), + * or a state transition result message (event type UPDATE). + * + * The default implementation push an event to the Gateway Service Manager. + * + * @param gatewayServiceManager the Gateway Service Manager + * @param event the event to push + */ + default void pushClientEventToGatewayManager(GatewayServiceManager gatewayServiceManager, GatewayServiceEvent event) { + gatewayServiceManager.onGatewayServiceEvent(event); + } + + // TODO: remove the following 2 apis in future changes /** * Gateway service close connection with error. This function is called when manager wants to close client * connection when there is an error. e.g. HelixManager connection is lost. @@ -37,12 +69,4 @@ public interface HelixGatewayServiceClientConnectionMonitor { * @param instanceName instance name */ public void completeConnection(String instanceName); - - /** - * Callback when we detect client connection is closed. It could be when client gracefully close the connection, - * or when client connection is timed out. - * @param clusterName cluster name - * @param instanceName instance name - */ - public void onClientClose(String clusterName, String instanceName); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java deleted file mode 100644 index 3ca7aeac5..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.apache.helix.gateway.api.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.gateway.service.GatewayServiceEvent; -import org.apache.helix.gateway.service.GatewayServiceManager; - - -/** - * Helix Gateway Service Processor interface allows sending state transition messages to - * participants through service implementing this interface. - */ -public interface HelixGatewayServiceProcessor - extends HelixGatewayServiceClientConnectionMonitor, HelixGatewayServiceShardStateProcessor { - - /** - * Callback when receiving a client event. - * Event could be a connection closed event (event type DISCONNECT), - * an initial connection establish event that contains a map of current chard states (event type CONNECT), - * or a state transition result message (event type UPDATE). - * - * The default implementation push an event to the Gateway Service Manager. - * - * @param gatewayServiceManager the Gateway Service Manager - * @param event the event to push - */ - default void onClientEvent(GatewayServiceManager gatewayServiceManager, GatewayServiceEvent event) { - gatewayServiceManager.newGatewayServiceEvent(event); - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java deleted file mode 100644 index fb9bd6294..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.apache.helix.gateway.api.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.model.Message; - - -public interface HelixGatewayServiceShardStateProcessor { - /** - * Gateway service send a state transition message to a connected participant. - * - * @param instanceName the name of the participant - * @param currentState the current state of the shard - * @param message the message to send - */ - void sendStateTransitionMessage(String instanceName, String currentState, Message message); -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java similarity index 96% rename from helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index 3291e48b4..a176ca89c 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.grpcservice; +package org.apache.helix.gateway.channel; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -27,7 +27,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.util.PerKeyLockRegistry; import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; import org.apache.helix.model.Message; @@ -43,7 +43,7 @@ import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMe * Helix Gateway Service GRPC UI implementation. */ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase - implements HelixGatewayServiceProcessor { + implements HelixGatewayServiceChannel { // create LOGGER private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class); @@ -83,7 +83,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli ShardState shardState = request.getShardState(); updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); } - onClientEvent(_manager, + pushClientEventToGatewayManager(_manager, StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); } @@ -153,7 +153,6 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } } - @Override public void onClientClose(String clusterName, String instanceName) { if (instanceName == null || clusterName == null) { // TODO: log error; @@ -162,7 +161,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli logger.info("Client close connection for instance: {}", instanceName); GatewayServiceEvent event = StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName); - onClientEvent(_manager, event); + pushClientEventToGatewayManager(_manager, event); _lockRegistry.withLock(instanceName, () -> { _reversedObserverMap.remove(_observerMap.get(instanceName)); _observerMap.remove(instanceName); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java new file mode 100644 index 000000000..0d80a96d0 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePullModeChannel.java @@ -0,0 +1,25 @@ +package org.apache.helix.gateway.channel; + +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; +import org.apache.helix.model.Message; + + +public class HelixGatewayServicePullModeChannel implements HelixGatewayServiceChannel { + + public HelixGatewayServicePullModeChannel() { + } + @Override + public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { + + } + + @Override + public void closeConnectionWithError(String instanceName, String reason) { + + } + + @Override + public void completeConnection(String instanceName) { + + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index 96a39bb01..d60905d3c 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; -import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; import org.apache.helix.manager.zk.HelixManagerStateListener; import org.apache.helix.manager.zk.ZKHelixManager; @@ -45,16 +45,16 @@ import org.apache.helix.participant.statemachine.StateTransitionError; */ public class HelixGatewayParticipant implements HelixManagerStateListener { public static final String UNASSIGNED_STATE = "UNASSIGNED"; - private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixGatewayServiceChannel _gatewayServiceChannel; private final HelixManager _helixManager; private final Runnable _onDisconnectedCallback; private final Map<String, Map<String, String>> _shardStateMap; private final Map<String, CompletableFuture<Boolean>> _stateTransitionResultMap; - private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + private HelixGatewayParticipant(HelixGatewayServiceChannel gatewayServiceChannel, Runnable onDisconnectedCallback, HelixManager helixManager, Map<String, Map<String, String>> initialShardStateMap) { - _gatewayServiceProcessor = gatewayServiceProcessor; + _gatewayServiceChannel = gatewayServiceChannel; _helixManager = helixManager; _onDisconnectedCallback = onDisconnectedCallback; _shardStateMap = initialShardStateMap; @@ -74,7 +74,7 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { CompletableFuture<Boolean> future = new CompletableFuture<>(); _stateTransitionResultMap.put(transitionId, future); - _gatewayServiceProcessor.sendStateTransitionMessage(_helixManager.getInstanceName(), + _gatewayServiceChannel.sendStateTransitionMessage(_helixManager.getInstanceName(), getCurrentState(resourceId, shardId), message); if (!future.get()) { @@ -183,7 +183,7 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { @Override public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception { _onDisconnectedCallback.run(); - _gatewayServiceProcessor.closeConnectionWithError(_helixManager.getInstanceName(), + _gatewayServiceChannel.closeConnectionWithError(_helixManager.getInstanceName(), error.getMessage()); } @@ -191,11 +191,11 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { if (_helixManager.isConnected()) { _helixManager.disconnect(); } - _gatewayServiceProcessor.completeConnection(_helixManager.getInstanceName()); + _gatewayServiceChannel.completeConnection(_helixManager.getInstanceName()); } public static class Builder { - private final HelixGatewayServiceProcessor _helixGatewayServiceProcessor; + private final HelixGatewayServiceChannel _helixGatewayServiceChannel; private final String _instanceName; private final String _clusterName; private final String _zkAddress; @@ -203,9 +203,9 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { private final List<String> _multiTopStateModelDefinitions; private final Map<String, Map<String, String>> _initialShardStateMap; - public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, String instanceName, + public Builder(HelixGatewayServiceChannel helixGatewayServiceChannel, String instanceName, String clusterName, String zkAddress, Runnable onDisconnectedCallback) { - _helixGatewayServiceProcessor = helixGatewayServiceProcessor; + _helixGatewayServiceChannel = helixGatewayServiceChannel; _instanceName = instanceName; _clusterName = clusterName; _zkAddress = zkAddress; @@ -258,7 +258,7 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { HelixManager participantManager = new ZKHelixManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, _zkAddress); HelixGatewayParticipant participant = - new HelixGatewayParticipant(_helixGatewayServiceProcessor, _onDisconnectedCallback, + new HelixGatewayParticipant(_helixGatewayServiceChannel, _onDisconnectedCallback, participantManager, _initialShardStateMap); _multiTopStateModelDefinitions.forEach( diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index fd7420735..2b626d395 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -26,9 +26,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.collect.ImmutableSet; -import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.constant.GatewayServiceEventType; -import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -51,7 +51,7 @@ public class GatewayServiceManager { private final ExecutorService _participantStateTransitionResultUpdator; // link to grpc service - private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixGatewayServiceChannel _gatewayServiceChannel; // a per key executor for connection event. All event for the same instance will be executed in sequence. // It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done. @@ -61,7 +61,7 @@ public class GatewayServiceManager { _helixGatewayParticipantMap = new ConcurrentHashMap<>(); _zkAddress = "foo"; _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); - _gatewayServiceProcessor = new HelixGatewayServiceGrpcService(this); + _gatewayServiceChannel = new HelixGatewayServiceGrpcService(this); _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable } @@ -71,7 +71,7 @@ public class GatewayServiceManager { * * @param event */ - public void newGatewayServiceEvent(GatewayServiceEvent event) { + public void onGatewayServiceEvent(GatewayServiceEvent event) { if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); } else { @@ -127,15 +127,15 @@ public class GatewayServiceManager { } } - public HelixGatewayServiceProcessor getHelixGatewayServiceProcessor() { - return _gatewayServiceProcessor; + public HelixGatewayServiceChannel getHelixGatewayServiceProcessor() { + return _gatewayServiceChannel; } private void createHelixGatewayParticipant(String clusterName, String instanceName, Map<String, Map<String, String>> initialShardStateMap) { // Create and add the participant to the participant map HelixGatewayParticipant.Builder participantBuilder = - new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName, + new HelixGatewayParticipant.Builder(_gatewayServiceChannel, instanceName, clusterName, _zkAddress, () -> removeHelixGatewayParticipant(clusterName, instanceName)).setInitialShardState( initialShardStateMap); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java deleted file mode 100644 index aed7518b9..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.helix.gateway.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Factory class to create GatewayServiceManager - */ -public class GatewayServiceManagerFactory { - public GatewayServiceManager createGatewayServiceManager() { - return new GatewayServiceManager(); - } -} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java index a5fbcf39c..15b95448f 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java @@ -44,7 +44,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -74,8 +74,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import static org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*; - public class TestHelper { private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index 0c81ac354..d3180a16c 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; -import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConfig; @@ -99,7 +99,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { private HelixGatewayParticipant addParticipant(String participantName, Map<String, Map<String, String>> initialShardMap) { HelixGatewayParticipant participant = - new HelixGatewayParticipant.Builder(new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, + new HelixGatewayParticipant.Builder(new MockHelixGatewayServiceChannel(_pendingMessageMap), participantName, CLUSTER_NAME, ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition( TEST_STATE_MODEL).setInitialShardState(initialShardMap).build(); _participants.add(participant); @@ -319,17 +319,17 @@ public class TestHelixGatewayParticipant extends ZkTestBase { @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnectAfterDroppingPartition") public void testGatewayParticipantDisconnectGracefully() { - int gracefulDisconnectCount = MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(); + int gracefulDisconnectCount = MockHelixGatewayServiceChannel._gracefulDisconnectCount.get(); // Remove the first participant HelixGatewayParticipant participant = _participants.get(0); deleteParticipant(participant); - Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(), gracefulDisconnectCount + 1); + Assert.assertEquals(MockHelixGatewayServiceChannel._gracefulDisconnectCount.get(), gracefulDisconnectCount + 1); } @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully") public void testGatewayParticipantDisconnectWithError() throws Exception { - int errorDisconnectCount = MockHelixGatewayServiceProcessor._errorDisconnectCount.get(); + int errorDisconnectCount = MockHelixGatewayServiceChannel._errorDisconnectCount.get(); int onDisconnectCallbackCount = _onDisconnectCallbackCount.get(); // Call on disconnect with error for all participants @@ -337,17 +337,17 @@ public class TestHelixGatewayParticipant extends ZkTestBase { participant.onDisconnected(null, new Exception("Test error")); } - Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(), + Assert.assertEquals(MockHelixGatewayServiceChannel._errorDisconnectCount.get(), errorDisconnectCount + _participants.size()); Assert.assertEquals(_onDisconnectCallbackCount.get(), onDisconnectCallbackCount + _participants.size()); } - public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor { + public static class MockHelixGatewayServiceChannel implements HelixGatewayServiceChannel { private final Map<String, Message> _pendingMessageMap; private static final AtomicInteger _gracefulDisconnectCount = new AtomicInteger(); private static final AtomicInteger _errorDisconnectCount = new AtomicInteger(); - public MockHelixGatewayServiceProcessor(Map<String, Message> pendingMessageMap) { + public MockHelixGatewayServiceChannel(Map<String, Message> pendingMessageMap) { _pendingMessageMap = pendingMessageMap; } @@ -360,15 +360,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase { public void closeConnectionWithError(String instanceName, String reason) { _errorDisconnectCount.incrementAndGet(); } - @Override public void completeConnection(String instanceName) { _gracefulDisconnectCount.incrementAndGet(); } - - @Override - public void onClientClose(String clusterName, String instanceName) { - - } } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java index dac903b28..627c572e9 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java @@ -138,7 +138,7 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { } @Override - public void newGatewayServiceEvent(GatewayServiceEvent event) { + public void onGatewayServiceEvent(GatewayServiceEvent event) { if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) { connectLatch.countDown(); } else if (event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) { diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java index 788bf4472..873841cae 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -1,6 +1,6 @@ package org.apache.helix.gateway.service; -import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; @@ -41,6 +41,6 @@ public class TestGatewayServiceManager { // Process disconnection event grpcService.report(null).onNext(disconnectionEvent); // Verify the events were processed in sequence - verify(manager, times(2)).newGatewayServiceEvent(any()); + verify(manager, times(2)).onGatewayServiceEvent(any()); } }
