This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3f153282537734f94c189261242431306425647a Author: xyuanlu <xyua...@gmail.com> AuthorDate: Mon Aug 5 21:27:05 2024 -0700 Interfaces of gateway service (#2871) Interfaces of gateway service --- ...elixGatewayServiceClientConnectionMonitor.java} | 31 ++++----- .../api/service/HelixGatewayServiceProcessor.java | 38 +++++------ ...=> HelixGatewayServiceShardStateProcessor.java} | 25 +------ .../HelixGatewayServiceGrpcService.java | 53 +++++++-------- .../participant/TestHelixGatewayParticipant.java | 77 +++++++++++----------- 5 files changed, 94 insertions(+), 130 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java similarity index 58% copy from helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java copy to helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java index c06443802..0f547354a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java @@ -19,35 +19,30 @@ package org.apache.helix.gateway.api.service; * under the License. */ -import org.apache.helix.model.Message; - /** - * Helix Gateway Service Processor interface allows sending state transition messages to - * participants through service implementing this interface. + * Interface for gateway manager to interact with clients on connection. */ -public interface HelixGatewayServiceProcessor { - - /** - * Send a state transition message to a remote participant. - * - * @param instanceName the name of the participant - * @param currentState the current state of the shard - * @param message the message to send - */ - void sendStateTransitionMessage(String instanceName, String currentState, - Message message); - +public interface HelixGatewayServiceClientConnectionMonitor { /** - * Close connection with error. + * Gateway service close connection with error. This function is called when manager wants to close client + * connection when there is an error. e.g. HelixManager connection is lost. * @param instanceName instance name * @param reason reason for closing connection */ public void closeConnectionWithError(String instanceName, String reason); /** - * Close connection with success. + * Gateway service close client connection with success. This function is called when manager wants to close client + * connection gracefully, e.g., when gateway service is shutting down. * @param instanceName instance name */ public void completeConnection(String instanceName); + /** + * Callback when we detect client connection is closed. It could be when client gracefully close the connection, + * or when client connection is timed out. + * @param clusterName cluster name + * @param instanceName instance name + */ + public void onClientClose(String clusterName, String instanceName); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java index c06443802..3ca7aeac5 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java @@ -19,35 +19,29 @@ package org.apache.helix.gateway.api.service; * under the License. */ -import org.apache.helix.model.Message; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; + /** * Helix Gateway Service Processor interface allows sending state transition messages to * participants through service implementing this interface. */ -public interface HelixGatewayServiceProcessor { +public interface HelixGatewayServiceProcessor + extends HelixGatewayServiceClientConnectionMonitor, HelixGatewayServiceShardStateProcessor { /** - * Send a state transition message to a remote participant. + * Callback when receiving a client event. + * Event could be a connection closed event (event type DISCONNECT), + * an initial connection establish event that contains a map of current chard states (event type CONNECT), + * or a state transition result message (event type UPDATE). * - * @param instanceName the name of the participant - * @param currentState the current state of the shard - * @param message the message to send - */ - void sendStateTransitionMessage(String instanceName, String currentState, - Message message); - - /** - * Close connection with error. - * @param instanceName instance name - * @param reason reason for closing connection - */ - public void closeConnectionWithError(String instanceName, String reason); - - /** - * Close connection with success. - * @param instanceName instance name + * The default implementation push an event to the Gateway Service Manager. + * + * @param gatewayServiceManager the Gateway Service Manager + * @param event the event to push */ - public void completeConnection(String instanceName); - + default void onClientEvent(GatewayServiceManager gatewayServiceManager, GatewayServiceEvent event) { + gatewayServiceManager.newGatewayServiceEvent(event); + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java similarity index 63% copy from helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java copy to helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java index c06443802..fb9bd6294 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java @@ -21,33 +21,14 @@ package org.apache.helix.gateway.api.service; import org.apache.helix.model.Message; -/** - * Helix Gateway Service Processor interface allows sending state transition messages to - * participants through service implementing this interface. - */ -public interface HelixGatewayServiceProcessor { +public interface HelixGatewayServiceShardStateProcessor { /** - * Send a state transition message to a remote participant. + * Gateway service send a state transition message to a connected participant. * * @param instanceName the name of the participant * @param currentState the current state of the shard * @param message the message to send */ - void sendStateTransitionMessage(String instanceName, String currentState, - Message message); - - /** - * Close connection with error. - * @param instanceName instance name - * @param reason reason for closing connection - */ - public void closeConnectionWithError(String instanceName, String reason); - - /** - * Close connection with success. - * @param instanceName instance name - */ - public void completeConnection(String instanceName); - + void sendStateTransitionMessage(String instanceName, String currentState, Message message); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java index 344a2649d..3291e48b4 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java @@ -50,6 +50,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli // Map to store the observer for each instance private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. + // map<observer, pair<instance, cluster>> private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); private final GatewayServiceManager _manager; @@ -82,19 +83,22 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli ShardState shardState = request.getShardState(); updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); } - _manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); + onClientEvent(_manager, + StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); } @Override public void onError(Throwable t) { - logger.info("Receive on error message: {}", t.getMessage()); - onClientClose(responseObserver); + logger.info("Receive on error, reason: {} message: {}", Status.fromThrowable(t).getCode(), t.getMessage()); + Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); + onClientClose(instanceInfo.getRight(), instanceInfo.getLeft()); } @Override public void onCompleted() { logger.info("Receive on complete message"); - onClientClose(responseObserver); + Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); + onClientClose(instanceInfo.getRight(), instanceInfo.getLeft()); } }; } @@ -108,13 +112,11 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli * @param message the message to convert to the transition message */ @Override - public void sendStateTransitionMessage(String instanceName, String currentState, - Message message) { + public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { StreamObserver<TransitionMessage> observer; observer = _observerMap.get(instanceName); if (observer != null) { - observer.onNext( - StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message)); + observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message)); } } @@ -125,7 +127,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli */ @Override public void closeConnectionWithError(String instanceName, String errorReason) { - logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason); + logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason); closeConnectionHelper(instanceName, errorReason, true); } @@ -139,7 +141,6 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli closeConnectionHelper(instanceName, null, false); } - private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) { StreamObserver<TransitionMessage> observer; observer = _observerMap.get(instanceName); @@ -152,34 +153,28 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } } - private void updateObserver(String instanceName, String clusterName, - StreamObserver<TransitionMessage> streamObserver) { - _lockRegistry.withLock(instanceName, () -> { - _observerMap.put(instanceName, streamObserver); - _reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName)); - }); - } - - private void onClientClose( - StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { - String instanceName; - String clusterName; - Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); - clusterName = instanceInfo.getRight(); - instanceName = instanceInfo.getLeft(); - logger.info("Client close connection for instance: {}", instanceName); - + @Override + public void onClientClose(String clusterName, String instanceName) { if (instanceName == null || clusterName == null) { // TODO: log error; return; } + logger.info("Client close connection for instance: {}", instanceName); GatewayServiceEvent event = StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName); - _manager.newGatewayServiceEvent(event); + onClientEvent(_manager, event); _lockRegistry.withLock(instanceName, () -> { - _reversedObserverMap.remove(responseObserver); + _reversedObserverMap.remove(_observerMap.get(instanceName)); _observerMap.remove(instanceName); _lockRegistry.removeLock(instanceName); }); } + + private void updateObserver(String instanceName, String clusterName, + StreamObserver<TransitionMessage> streamObserver) { + _lockRegistry.withLock(instanceName, () -> { + _observerMap.put(instanceName, streamObserver); + _reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName)); + }); + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index 200ac8f04..0c81ac354 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -44,6 +44,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.collections.Lists; + public class TestHelixGatewayParticipant extends ZkTestBase { private static final String CLUSTER_NAME = TestHelixGatewayParticipant.class.getSimpleName(); private static final int START_NUM_NODE = 3; @@ -97,11 +98,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase { */ private HelixGatewayParticipant addParticipant(String participantName, Map<String, Map<String, String>> initialShardMap) { - HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder( - new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, CLUSTER_NAME, - ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition( - TEST_STATE_MODEL) - .setInitialShardState(initialShardMap).build(); + HelixGatewayParticipant participant = + new HelixGatewayParticipant.Builder(new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, + CLUSTER_NAME, ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition( + TEST_STATE_MODEL).setInitialShardState(initialShardMap).build(); _participants.add(participant); return participant; } @@ -126,9 +126,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase { * Add a participant to the IdealState's preference list. */ private void addToPreferenceList(HelixGatewayParticipant participant) { - IdealState idealState = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); - idealState.getPreferenceLists().values() + IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + idealState.getPreferenceLists() + .values() .forEach(preferenceList -> preferenceList.add(participant.getInstanceName())); idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) + 1)); _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState); @@ -138,9 +138,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase { * Remove a participant from the IdealState's preference list. */ private void removeFromPreferenceList(HelixGatewayParticipant participant) { - IdealState idealState = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); - idealState.getPreferenceLists().values() + IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + idealState.getPreferenceLists() + .values() .forEach(preferenceList -> preferenceList.remove(participant.getInstanceName())); idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) - 1)); _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState); @@ -151,13 +151,13 @@ public class TestHelixGatewayParticipant extends ZkTestBase { */ private void createDB() { createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, - _participants.stream().map(HelixGatewayParticipant::getInstanceName) - .collect(Collectors.toList()), TEST_STATE_MODEL, 1, _participants.size()); + _participants.stream().map(HelixGatewayParticipant::getInstanceName).collect(Collectors.toList()), + TEST_STATE_MODEL, 1, _participants.size()); _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(new HashSet<>( - _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + .setResources(new HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); } /** @@ -178,10 +178,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase { /** * Get the current state of a Helix shard. */ - private String getHelixCurrentState(String instanceName, String resourceName, - String shardId) { + private String getHelixCurrentState(String instanceName, String resourceName, String shardId) { return _gSetupTool.getClusterManagementTool() - .getResourceExternalView(CLUSTER_NAME, resourceName).getStateMap(shardId) + .getResourceExternalView(CLUSTER_NAME, resourceName) + .getStateMap(shardId) .getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE); } @@ -189,8 +189,8 @@ public class TestHelixGatewayParticipant extends ZkTestBase { * Verify that all specified participants have pending messages. */ private void verifyPendingMessages(List<HelixGatewayParticipant> participants) throws Exception { - Assert.assertTrue(TestHelper.verify(() -> participants.stream() - .allMatch(participant -> getPendingMessage(participant.getInstanceName()) != null), + Assert.assertTrue(TestHelper.verify( + () -> participants.stream().allMatch(participant -> getPendingMessage(participant.getInstanceName()) != null), TestHelper.WAIT_DURATION)); } @@ -200,12 +200,11 @@ public class TestHelixGatewayParticipant extends ZkTestBase { private void verifyGatewayStateMatchesHelixState() throws Exception { Assert.assertTrue(TestHelper.verify(() -> _participants.stream().allMatch(participant -> { String instanceName = participant.getInstanceName(); - for (String resourceName : _gSetupTool.getClusterManagementTool() - .getResourcesInCluster(CLUSTER_NAME)) { + for (String resourceName : _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) { for (String shardId : _gSetupTool.getClusterManagementTool() - .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) { - String helixCurrentState = - getHelixCurrentState(instanceName, resourceName, shardId); + .getResourceIdealState(CLUSTER_NAME, resourceName) + .getPartitionSet()) { + String helixCurrentState = getHelixCurrentState(instanceName, resourceName, shardId); if (!participant.getCurrentState(resourceName, shardId).equals(helixCurrentState)) { return false; } @@ -220,10 +219,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase { */ private void verifyHelixPartitionStates(String instanceName, String state) throws Exception { Assert.assertTrue(TestHelper.verify(() -> { - for (String resourceName : _gSetupTool.getClusterManagementTool() - .getResourcesInCluster(CLUSTER_NAME)) { + for (String resourceName : _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) { for (String shardId : _gSetupTool.getClusterManagementTool() - .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) { + .getResourceIdealState(CLUSTER_NAME, resourceName) + .getPartitionSet()) { if (!getHelixCurrentState(instanceName, resourceName, shardId).equals(state)) { return false; } @@ -287,8 +286,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { deleteParticipant(participant); // Verify the Helix state transitions to "UNASSIGNED_STATE" for the participant - verifyHelixPartitionStates(participant.getInstanceName(), - HelixGatewayParticipant.UNASSIGNED_STATE); + verifyHelixPartitionStates(participant.getInstanceName(), HelixGatewayParticipant.UNASSIGNED_STATE); // Re-add the participant with its initial state addParticipant(participant.getInstanceName(), participant.getShardStateMap()); @@ -303,8 +301,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { // Remove the first participant and verify state HelixGatewayParticipant participant = _participants.get(0); deleteParticipant(participant); - verifyHelixPartitionStates(participant.getInstanceName(), - HelixGatewayParticipant.UNASSIGNED_STATE); + verifyHelixPartitionStates(participant.getInstanceName(), HelixGatewayParticipant.UNASSIGNED_STATE); // Remove shard preference and re-add the participant removeFromPreferenceList(participant); @@ -327,8 +324,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { HelixGatewayParticipant participant = _participants.get(0); deleteParticipant(participant); - Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(), - gracefulDisconnectCount + 1); + Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(), gracefulDisconnectCount + 1); } @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully") @@ -343,8 +339,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(), errorDisconnectCount + _participants.size()); - Assert.assertEquals(_onDisconnectCallbackCount.get(), - onDisconnectCallbackCount + _participants.size()); + Assert.assertEquals(_onDisconnectCallbackCount.get(), onDisconnectCallbackCount + _participants.size()); } public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor { @@ -357,8 +352,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { } @Override - public void sendStateTransitionMessage(String instanceName, String currentState, - Message message) { + public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { _pendingMessageMap.put(instanceName, message); } @@ -371,5 +365,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase { public void completeConnection(String instanceName) { _gracefulDisconnectCount.incrementAndGet(); } + + @Override + public void onClientClose(String clusterName, String instanceName) { + + } } }