This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4dcd8df25a3e69a70e0ea584f8baf915821622bf Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Fri Aug 2 10:33:47 2024 -0700 Implement helix manager disconnect and client disconnect handling for HelixGatewayParticipant.(#2868) * Implement helix manager disconnect and client disconnect handling for HelixGatewayParticipant. --- .../participant/HelixGatewayParticipant.java | 53 ++++++++++++++++++---- .../gateway/service/GatewayServiceManager.java | 4 +- .../participant/TestHelixGatewayParticipant.java | 40 ++++++++++++++-- 3 files changed, 82 insertions(+), 15 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index 640552960..96a39bb01 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.HelixManagerStateListener; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateTransitionError; @@ -42,17 +43,20 @@ import org.apache.helix.participant.statemachine.StateTransitionError; * for the participant and updates the state of the participant's shards upon successful state * transitions signaled by remote participant. */ -public class HelixGatewayParticipant { +public class HelixGatewayParticipant implements HelixManagerStateListener { public static final String UNASSIGNED_STATE = "UNASSIGNED"; private final HelixGatewayServiceProcessor _gatewayServiceProcessor; - private final HelixManager _participantManager; + private final HelixManager _helixManager; + private final Runnable _onDisconnectedCallback; private final Map<String, Map<String, String>> _shardStateMap; private final Map<String, CompletableFuture<Boolean>> _stateTransitionResultMap; private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, - HelixManager participantManager, Map<String, Map<String, String>> initialShardStateMap) { + Runnable onDisconnectedCallback, HelixManager helixManager, + Map<String, Map<String, String>> initialShardStateMap) { _gatewayServiceProcessor = gatewayServiceProcessor; - _participantManager = participantManager; + _helixManager = helixManager; + _onDisconnectedCallback = onDisconnectedCallback; _shardStateMap = initialShardStateMap; _stateTransitionResultMap = new ConcurrentHashMap<>(); } @@ -70,7 +74,7 @@ public class HelixGatewayParticipant { CompletableFuture<Boolean> future = new CompletableFuture<>(); _stateTransitionResultMap.put(transitionId, future); - _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + _gatewayServiceProcessor.sendStateTransitionMessage(_helixManager.getInstanceName(), getCurrentState(resourceId, shardId), message); if (!future.get()) { @@ -107,7 +111,7 @@ public class HelixGatewayParticipant { * @return participant instance name */ public String getInstanceName() { - return _participantManager.getInstanceName(); + return _helixManager.getInstanceName(); } /** @@ -129,7 +133,7 @@ public class HelixGatewayParticipant { } @VisibleForTesting - public Map<String, Map<String, String>> getShardStateMap() { + Map<String, Map<String, String>> getShardStateMap() { return _shardStateMap; } @@ -160,8 +164,34 @@ public class HelixGatewayParticipant { } } + /** + * Invoked when the HelixManager connection to zookeeper is established + * + * @param helixManager HelixManager that is successfully connected + */ + public void onConnected(HelixManager helixManager) throws Exception { + // Do nothing + } + + /** + * Invoked when the HelixManager connection to zookeeper is closed unexpectedly. This will not be + * run if the remote participant disconnects from gateway. + * + * @param helixManager HelixManager that fails to be connected + * @param error connection error + */ + @Override + public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception { + _onDisconnectedCallback.run(); + _gatewayServiceProcessor.closeConnectionWithError(_helixManager.getInstanceName(), + error.getMessage()); + } + public void disconnect() { - _participantManager.disconnect(); + if (_helixManager.isConnected()) { + _helixManager.disconnect(); + } + _gatewayServiceProcessor.completeConnection(_helixManager.getInstanceName()); } public static class Builder { @@ -169,15 +199,17 @@ public class HelixGatewayParticipant { private final String _instanceName; private final String _clusterName; private final String _zkAddress; + private final Runnable _onDisconnectedCallback; private final List<String> _multiTopStateModelDefinitions; private final Map<String, Map<String, String>> _initialShardStateMap; public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, String instanceName, - String clusterName, String zkAddress) { + String clusterName, String zkAddress, Runnable onDisconnectedCallback) { _helixGatewayServiceProcessor = helixGatewayServiceProcessor; _instanceName = instanceName; _clusterName = clusterName; _zkAddress = zkAddress; + _onDisconnectedCallback = onDisconnectedCallback; _multiTopStateModelDefinitions = new ArrayList<>(); _initialShardStateMap = new ConcurrentHashMap<>(); } @@ -226,7 +258,8 @@ public class HelixGatewayParticipant { HelixManager participantManager = new ZKHelixManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, _zkAddress); HelixGatewayParticipant participant = - new HelixGatewayParticipant(_helixGatewayServiceProcessor, participantManager, + new HelixGatewayParticipant(_helixGatewayServiceProcessor, _onDisconnectedCallback, + participantManager, _initialShardStateMap); _multiTopStateModelDefinitions.forEach( stateModelDefinition -> participantManager.getStateMachineEngine() diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 4553c04ca..fd7420735 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -136,7 +136,9 @@ public class GatewayServiceManager { // Create and add the participant to the participant map HelixGatewayParticipant.Builder participantBuilder = new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName, - _zkAddress).setInitialShardState(initialShardStateMap); + _zkAddress, + () -> removeHelixGatewayParticipant(clusterName, instanceName)).setInitialShardState( + initialShardStateMap); SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach( participantBuilder::addMultiTopStateStateModelDefinition); _helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index dd22b9fa0..200ac8f04 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; @@ -45,7 +46,7 @@ import org.testng.collections.Lists; public class TestHelixGatewayParticipant extends ZkTestBase { private static final String CLUSTER_NAME = TestHelixGatewayParticipant.class.getSimpleName(); - private static final int START_NUM_NODE = 2; + private static final int START_NUM_NODE = 3; private static final String TEST_DB = "TestDB"; private static final String TEST_STATE_MODEL = "OnlineOffline"; private static final String CONTROLLER_PREFIX = "controller"; @@ -56,6 +57,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { private int _nextStartPort = 12000; private final List<HelixGatewayParticipant> _participants = Lists.newArrayList(); private final Map<String, Message> _pendingMessageMap = new ConcurrentHashMap<>(); + private final AtomicInteger _onDisconnectCallbackCount = new AtomicInteger(); @BeforeClass public void beforeClass() { @@ -97,7 +99,8 @@ public class TestHelixGatewayParticipant extends ZkTestBase { Map<String, Map<String, String>> initialShardMap) { HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder( new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, CLUSTER_NAME, - ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL) + ZK_ADDR, _onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition( + TEST_STATE_MODEL) .setInitialShardState(initialShardMap).build(); _participants.add(participant); return participant; @@ -317,8 +320,37 @@ public class TestHelixGatewayParticipant extends ZkTestBase { verifyGatewayStateMatchesHelixState(); } + @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnectAfterDroppingPartition") + public void testGatewayParticipantDisconnectGracefully() { + int gracefulDisconnectCount = MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(); + // Remove the first participant + HelixGatewayParticipant participant = _participants.get(0); + deleteParticipant(participant); + + Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(), + gracefulDisconnectCount + 1); + } + + @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully") + public void testGatewayParticipantDisconnectWithError() throws Exception { + int errorDisconnectCount = MockHelixGatewayServiceProcessor._errorDisconnectCount.get(); + int onDisconnectCallbackCount = _onDisconnectCallbackCount.get(); + + // Call on disconnect with error for all participants + for (HelixGatewayParticipant participant : _participants) { + participant.onDisconnected(null, new Exception("Test error")); + } + + Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(), + errorDisconnectCount + _participants.size()); + Assert.assertEquals(_onDisconnectCallbackCount.get(), + onDisconnectCallbackCount + _participants.size()); + } + public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor { private final Map<String, Message> _pendingMessageMap; + private static final AtomicInteger _gracefulDisconnectCount = new AtomicInteger(); + private static final AtomicInteger _errorDisconnectCount = new AtomicInteger(); public MockHelixGatewayServiceProcessor(Map<String, Message> pendingMessageMap) { _pendingMessageMap = pendingMessageMap; @@ -332,12 +364,12 @@ public class TestHelixGatewayParticipant extends ZkTestBase { @Override public void closeConnectionWithError(String instanceName, String reason) { - + _errorDisconnectCount.incrementAndGet(); } @Override public void completeConnection(String instanceName) { - + _gracefulDisconnectCount.incrementAndGet(); } } }