This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 303009c78 Interfaces of gateway service (#2871)
303009c78 is described below
commit 303009c78fb73c5de8cb0d93bf6428b52f187776
Author: xyuanlu <[email protected]>
AuthorDate: Mon Aug 5 21:27:05 2024 -0700
Interfaces of gateway service (#2871)
Interfaces of gateway service
---
...elixGatewayServiceClientConnectionMonitor.java} | 31 ++++-----
.../api/service/HelixGatewayServiceProcessor.java | 38 +++++------
...=> HelixGatewayServiceShardStateProcessor.java} | 25 +------
.../HelixGatewayServiceGrpcService.java | 53 +++++++--------
.../participant/TestHelixGatewayParticipant.java | 77 +++++++++++-----------
5 files changed, 94 insertions(+), 130 deletions(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
similarity index 58%
copy from
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
copy to
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
index c06443802..0f547354a 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
@@ -19,35 +19,30 @@ package org.apache.helix.gateway.api.service;
* under the License.
*/
-import org.apache.helix.model.Message;
-
/**
- * Helix Gateway Service Processor interface allows sending state transition
messages to
- * participants through service implementing this interface.
+ * Interface for gateway manager to interact with clients on connection.
*/
-public interface HelixGatewayServiceProcessor {
-
- /**
- * Send a state transition message to a remote participant.
- *
- * @param instanceName the name of the participant
- * @param currentState the current state of the shard
- * @param message the message to send
- */
- void sendStateTransitionMessage(String instanceName, String currentState,
- Message message);
-
+public interface HelixGatewayServiceClientConnectionMonitor {
/**
- * Close connection with error.
+ * Gateway service close connection with error. This function is called when
manager wants to close client
+ * connection when there is an error. e.g. HelixManager connection is lost.
* @param instanceName instance name
* @param reason reason for closing connection
*/
public void closeConnectionWithError(String instanceName, String reason);
/**
- * Close connection with success.
+ * Gateway service close client connection with success. This function is
called when manager wants to close client
+ * connection gracefully, e.g., when gateway service is shutting down.
* @param instanceName instance name
*/
public void completeConnection(String instanceName);
+ /**
+ * Callback when we detect client connection is closed. It could be when
client gracefully close the connection,
+ * or when client connection is timed out.
+ * @param clusterName cluster name
+ * @param instanceName instance name
+ */
+ public void onClientClose(String clusterName, String instanceName);
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index c06443802..3ca7aeac5 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -19,35 +19,29 @@ package org.apache.helix.gateway.api.service;
* under the License.
*/
-import org.apache.helix.model.Message;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+
/**
* Helix Gateway Service Processor interface allows sending state transition
messages to
* participants through service implementing this interface.
*/
-public interface HelixGatewayServiceProcessor {
+public interface HelixGatewayServiceProcessor
+ extends HelixGatewayServiceClientConnectionMonitor,
HelixGatewayServiceShardStateProcessor {
/**
- * Send a state transition message to a remote participant.
+ * Callback when receiving a client event.
+ * Event could be a connection closed event (event type DISCONNECT),
+ * an initial connection establish event that contains a map of current
chard states (event type CONNECT),
+ * or a state transition result message (event type UPDATE).
*
- * @param instanceName the name of the participant
- * @param currentState the current state of the shard
- * @param message the message to send
- */
- void sendStateTransitionMessage(String instanceName, String currentState,
- Message message);
-
- /**
- * Close connection with error.
- * @param instanceName instance name
- * @param reason reason for closing connection
- */
- public void closeConnectionWithError(String instanceName, String reason);
-
- /**
- * Close connection with success.
- * @param instanceName instance name
+ * The default implementation push an event to the Gateway Service Manager.
+ *
+ * @param gatewayServiceManager the Gateway Service Manager
+ * @param event the event to push
*/
- public void completeConnection(String instanceName);
-
+ default void onClientEvent(GatewayServiceManager gatewayServiceManager,
GatewayServiceEvent event) {
+ gatewayServiceManager.newGatewayServiceEvent(event);
+ }
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
similarity index 63%
copy from
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
copy to
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
index c06443802..fb9bd6294 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
@@ -21,33 +21,14 @@ package org.apache.helix.gateway.api.service;
import org.apache.helix.model.Message;
-/**
- * Helix Gateway Service Processor interface allows sending state transition
messages to
- * participants through service implementing this interface.
- */
-public interface HelixGatewayServiceProcessor {
+public interface HelixGatewayServiceShardStateProcessor {
/**
- * Send a state transition message to a remote participant.
+ * Gateway service send a state transition message to a connected
participant.
*
* @param instanceName the name of the participant
* @param currentState the current state of the shard
* @param message the message to send
*/
- void sendStateTransitionMessage(String instanceName, String currentState,
- Message message);
-
- /**
- * Close connection with error.
- * @param instanceName instance name
- * @param reason reason for closing connection
- */
- public void closeConnectionWithError(String instanceName, String reason);
-
- /**
- * Close connection with success.
- * @param instanceName instance name
- */
- public void completeConnection(String instanceName);
-
+ void sendStateTransitionMessage(String instanceName, String currentState,
Message message);
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 344a2649d..3291e48b4 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -50,6 +50,7 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
// Map to store the observer for each instance
private final Map<String, StreamObserver<TransitionMessage>> _observerMap =
new HashMap<>();
// A reverse map to store the instance name for each observer. It is used to
find the instance when connection is closed.
+ // map<observer, pair<instance, cluster>>
private final Map<StreamObserver<TransitionMessage>, Pair<String, String>>
_reversedObserverMap = new HashMap<>();
private final GatewayServiceManager _manager;
@@ -82,19 +83,22 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
ShardState shardState = request.getShardState();
updateObserver(shardState.getInstanceName(),
shardState.getClusterName(), responseObserver);
}
-
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+ onClientEvent(_manager,
+
StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
}
@Override
public void onError(Throwable t) {
- logger.info("Receive on error message: {}", t.getMessage());
- onClientClose(responseObserver);
+ logger.info("Receive on error, reason: {} message: {}",
Status.fromThrowable(t).getCode(), t.getMessage());
+ Pair<String, String> instanceInfo =
_reversedObserverMap.get(responseObserver);
+ onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
}
@Override
public void onCompleted() {
logger.info("Receive on complete message");
- onClientClose(responseObserver);
+ Pair<String, String> instanceInfo =
_reversedObserverMap.get(responseObserver);
+ onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
}
};
}
@@ -108,13 +112,11 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
* @param message the message to convert to the transition message
*/
@Override
- public void sendStateTransitionMessage(String instanceName, String
currentState,
- Message message) {
+ public void sendStateTransitionMessage(String instanceName, String
currentState, Message message) {
StreamObserver<TransitionMessage> observer;
observer = _observerMap.get(instanceName);
if (observer != null) {
- observer.onNext(
-
StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
+
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
}
}
@@ -125,7 +127,7 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
*/
@Override
public void closeConnectionWithError(String instanceName, String
errorReason) {
- logger.info("Close connection for instance: {} with error reason: {}",
instanceName, errorReason);
+ logger.info("Close connection for instance: {} with error reason: {}",
instanceName, errorReason);
closeConnectionHelper(instanceName, errorReason, true);
}
@@ -139,7 +141,6 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
closeConnectionHelper(instanceName, null, false);
}
-
private void closeConnectionHelper(String instanceName, String errorReason,
boolean withError) {
StreamObserver<TransitionMessage> observer;
observer = _observerMap.get(instanceName);
@@ -152,34 +153,28 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
}
}
- private void updateObserver(String instanceName, String clusterName,
- StreamObserver<TransitionMessage> streamObserver) {
- _lockRegistry.withLock(instanceName, () -> {
- _observerMap.put(instanceName, streamObserver);
- _reversedObserverMap.put(streamObserver, new
ImmutablePair<>(instanceName, clusterName));
- });
- }
-
- private void onClientClose(
-
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
responseObserver) {
- String instanceName;
- String clusterName;
- Pair<String, String> instanceInfo =
_reversedObserverMap.get(responseObserver);
- clusterName = instanceInfo.getRight();
- instanceName = instanceInfo.getLeft();
- logger.info("Client close connection for instance: {}", instanceName);
-
+ @Override
+ public void onClientClose(String clusterName, String instanceName) {
if (instanceName == null || clusterName == null) {
// TODO: log error;
return;
}
+ logger.info("Client close connection for instance: {}", instanceName);
GatewayServiceEvent event =
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName,
instanceName);
- _manager.newGatewayServiceEvent(event);
+ onClientEvent(_manager, event);
_lockRegistry.withLock(instanceName, () -> {
- _reversedObserverMap.remove(responseObserver);
+ _reversedObserverMap.remove(_observerMap.get(instanceName));
_observerMap.remove(instanceName);
_lockRegistry.removeLock(instanceName);
});
}
+
+ private void updateObserver(String instanceName, String clusterName,
+ StreamObserver<TransitionMessage> streamObserver) {
+ _lockRegistry.withLock(instanceName, () -> {
+ _observerMap.put(instanceName, streamObserver);
+ _reversedObserverMap.put(streamObserver, new
ImmutablePair<>(instanceName, clusterName));
+ });
+ }
}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index 200ac8f04..0c81ac354 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -44,6 +44,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
+
public class TestHelixGatewayParticipant extends ZkTestBase {
private static final String CLUSTER_NAME =
TestHelixGatewayParticipant.class.getSimpleName();
private static final int START_NUM_NODE = 3;
@@ -97,11 +98,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
*/
private HelixGatewayParticipant addParticipant(String participantName,
Map<String, Map<String, String>> initialShardMap) {
- HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
- new MockHelixGatewayServiceProcessor(_pendingMessageMap),
participantName, CLUSTER_NAME,
- ZK_ADDR,
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
- TEST_STATE_MODEL)
- .setInitialShardState(initialShardMap).build();
+ HelixGatewayParticipant participant =
+ new HelixGatewayParticipant.Builder(new
MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName,
+ CLUSTER_NAME, ZK_ADDR,
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
+ TEST_STATE_MODEL).setInitialShardState(initialShardMap).build();
_participants.add(participant);
return participant;
}
@@ -126,9 +126,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
* Add a participant to the IdealState's preference list.
*/
private void addToPreferenceList(HelixGatewayParticipant participant) {
- IdealState idealState =
-
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
TEST_DB);
- idealState.getPreferenceLists().values()
+ IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
TEST_DB);
+ idealState.getPreferenceLists()
+ .values()
.forEach(preferenceList ->
preferenceList.add(participant.getInstanceName()));
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
+ 1));
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
TEST_DB, idealState);
@@ -138,9 +138,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
* Remove a participant from the IdealState's preference list.
*/
private void removeFromPreferenceList(HelixGatewayParticipant participant) {
- IdealState idealState =
-
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
TEST_DB);
- idealState.getPreferenceLists().values()
+ IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
TEST_DB);
+ idealState.getPreferenceLists()
+ .values()
.forEach(preferenceList ->
preferenceList.remove(participant.getInstanceName()));
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
- 1));
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
TEST_DB, idealState);
@@ -151,13 +151,13 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
*/
private void createDB() {
createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB,
- _participants.stream().map(HelixGatewayParticipant::getInstanceName)
- .collect(Collectors.toList()), TEST_STATE_MODEL, 1,
_participants.size());
+
_participants.stream().map(HelixGatewayParticipant::getInstanceName).collect(Collectors.toList()),
+ TEST_STATE_MODEL, 1, _participants.size());
_clusterVerifier = new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setResources(new HashSet<>(
-
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
-
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+ .setResources(new
HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
}
/**
@@ -178,10 +178,10 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
/**
* Get the current state of a Helix shard.
*/
- private String getHelixCurrentState(String instanceName, String resourceName,
- String shardId) {
+ private String getHelixCurrentState(String instanceName, String
resourceName, String shardId) {
return _gSetupTool.getClusterManagementTool()
- .getResourceExternalView(CLUSTER_NAME,
resourceName).getStateMap(shardId)
+ .getResourceExternalView(CLUSTER_NAME, resourceName)
+ .getStateMap(shardId)
.getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE);
}
@@ -189,8 +189,8 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
* Verify that all specified participants have pending messages.
*/
private void verifyPendingMessages(List<HelixGatewayParticipant>
participants) throws Exception {
- Assert.assertTrue(TestHelper.verify(() -> participants.stream()
- .allMatch(participant ->
getPendingMessage(participant.getInstanceName()) != null),
+ Assert.assertTrue(TestHelper.verify(
+ () -> participants.stream().allMatch(participant ->
getPendingMessage(participant.getInstanceName()) != null),
TestHelper.WAIT_DURATION));
}
@@ -200,12 +200,11 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
private void verifyGatewayStateMatchesHelixState() throws Exception {
Assert.assertTrue(TestHelper.verify(() ->
_participants.stream().allMatch(participant -> {
String instanceName = participant.getInstanceName();
- for (String resourceName : _gSetupTool.getClusterManagementTool()
- .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String resourceName :
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) {
for (String shardId : _gSetupTool.getClusterManagementTool()
- .getResourceIdealState(CLUSTER_NAME,
resourceName).getPartitionSet()) {
- String helixCurrentState =
- getHelixCurrentState(instanceName, resourceName, shardId);
+ .getResourceIdealState(CLUSTER_NAME, resourceName)
+ .getPartitionSet()) {
+ String helixCurrentState = getHelixCurrentState(instanceName,
resourceName, shardId);
if (!participant.getCurrentState(resourceName,
shardId).equals(helixCurrentState)) {
return false;
}
@@ -220,10 +219,10 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
*/
private void verifyHelixPartitionStates(String instanceName, String state)
throws Exception {
Assert.assertTrue(TestHelper.verify(() -> {
- for (String resourceName : _gSetupTool.getClusterManagementTool()
- .getResourcesInCluster(CLUSTER_NAME)) {
+ for (String resourceName :
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) {
for (String shardId : _gSetupTool.getClusterManagementTool()
- .getResourceIdealState(CLUSTER_NAME,
resourceName).getPartitionSet()) {
+ .getResourceIdealState(CLUSTER_NAME, resourceName)
+ .getPartitionSet()) {
if (!getHelixCurrentState(instanceName, resourceName,
shardId).equals(state)) {
return false;
}
@@ -287,8 +286,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
deleteParticipant(participant);
// Verify the Helix state transitions to "UNASSIGNED_STATE" for the
participant
- verifyHelixPartitionStates(participant.getInstanceName(),
- HelixGatewayParticipant.UNASSIGNED_STATE);
+ verifyHelixPartitionStates(participant.getInstanceName(),
HelixGatewayParticipant.UNASSIGNED_STATE);
// Re-add the participant with its initial state
addParticipant(participant.getInstanceName(),
participant.getShardStateMap());
@@ -303,8 +301,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
// Remove the first participant and verify state
HelixGatewayParticipant participant = _participants.get(0);
deleteParticipant(participant);
- verifyHelixPartitionStates(participant.getInstanceName(),
- HelixGatewayParticipant.UNASSIGNED_STATE);
+ verifyHelixPartitionStates(participant.getInstanceName(),
HelixGatewayParticipant.UNASSIGNED_STATE);
// Remove shard preference and re-add the participant
removeFromPreferenceList(participant);
@@ -327,8 +324,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
HelixGatewayParticipant participant = _participants.get(0);
deleteParticipant(participant);
-
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
- gracefulDisconnectCount + 1);
+
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
gracefulDisconnectCount + 1);
}
@Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully")
@@ -343,8 +339,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(),
errorDisconnectCount + _participants.size());
- Assert.assertEquals(_onDisconnectCallbackCount.get(),
- onDisconnectCallbackCount + _participants.size());
+ Assert.assertEquals(_onDisconnectCallbackCount.get(),
onDisconnectCallbackCount + _participants.size());
}
public static class MockHelixGatewayServiceProcessor implements
HelixGatewayServiceProcessor {
@@ -357,8 +352,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase
{
}
@Override
- public void sendStateTransitionMessage(String instanceName, String
currentState,
- Message message) {
+ public void sendStateTransitionMessage(String instanceName, String
currentState, Message message) {
_pendingMessageMap.put(instanceName, message);
}
@@ -371,5 +365,10 @@ public class TestHelixGatewayParticipant extends
ZkTestBase {
public void completeConnection(String instanceName) {
_gracefulDisconnectCount.incrementAndGet();
}
+
+ @Override
+ public void onClientClose(String clusterName, String instanceName) {
+
+ }
}
}