This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 11b34db83 Expose setting gateway service channel to allow external
managment of the lifecycle of the channel. (#2913)
11b34db83 is described below
commit 11b34db83cefe90f308e176845e74debce21de68
Author: Zachary Pinto <[email protected]>
AuthorDate: Thu Sep 12 14:13:13 2024 -0700
Expose setting gateway service channel to allow external managment of the
lifecycle of the channel. (#2913)
Expose setting gateway service channel to allow external managment of the
lifecycle of the channel.
---
.../channel/HelixGatewayServiceGrpcService.java | 3 +-
.../gateway/service/GatewayServiceManager.java | 45 ++++++++++++++++------
.../apache/helix/gateway/util/PollChannelUtil.java | 1 -
.../service/TestGatewayServiceConnection.java | 10 ++++-
4 files changed, 42 insertions(+), 17 deletions(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index 9ed06ca23..a9c1ca4a2 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -60,7 +60,7 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
private final GatewayServiceManager _manager;
// A fine grain lock register on instance level
- private final PerKeyLockRegistry _lockRegistry;
+ private final PerKeyLockRegistry _lockRegistry = new PerKeyLockRegistry();;
private final GatewayServiceChannelConfig _config;
@@ -69,7 +69,6 @@ public class HelixGatewayServiceGrpcService extends
HelixGatewayServiceGrpc.Heli
public HelixGatewayServiceGrpcService(GatewayServiceManager manager,
GatewayServiceChannelConfig config) {
_manager = manager;
_config = config;
- _lockRegistry = new PerKeyLockRegistry();
}
/**
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 221c2aeac..94f5783f0 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -57,27 +57,44 @@ public class GatewayServiceManager {
private final ExecutorService _participantStateTransitionResultUpdator;
// link to grpc service
- private final HelixGatewayServiceChannel _gatewayServiceChannel;
+ private HelixGatewayServiceChannel _gatewayServiceChannel;
// a per key executor for connection event. All event for the same instance
will be executed in sequence.
// It is used to ensure for each instance, the connect/disconnect event
won't start until the previous one is done.
private final PerKeyBlockingExecutor _connectionEventProcessor;
- private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
-
private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;
- public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
+ public GatewayServiceManager(String zkAddress) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
_participantStateTransitionResultUpdator =
Executors.newSingleThreadExecutor();
- _gatewayServiceChannel =
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
this);
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); //
todo: make it configurable
- _gatewayServiceChannelConfig = gatewayServiceChannelConfig;
_currentStateCacheMap = new HashMap<>();
}
+ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
+ this(zkAddress);
+ _gatewayServiceChannel =
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
this);
+ }
+
+ /**
+ * Set the gateway service channel. This can only be called once.
+ * The channel is used to send state transition message to the participant.
+ *
+ * @param channel the gateway service channel
+ * @throws IllegalStateException if the channel is already set
+ */
+ public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) {
+ if (_gatewayServiceChannel != null) {
+ _gatewayServiceChannel.stop();
+ return;
+ }
+ throw new IllegalStateException(
+ "Gateway service channel is already set, it can only be set once.");
+ }
+
/**
* Process the event from Grpc service and dispatch to async executor for
processing.
*
@@ -85,7 +102,7 @@ public class GatewayServiceManager {
*/
public void onGatewayServiceEvent(GatewayServiceEvent event) {
if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
- _participantStateTransitionResultUpdator.submit(new
shardStateUpdator(event));
+ _participantStateTransitionResultUpdator.submit(new
ShardStateUpdator(event));
} else {
_connectionEventProcessor.offerEvent(event.getInstanceName(), new
ParticipantConnectionProcessor(event));
}
@@ -117,11 +134,11 @@ public class GatewayServiceManager {
/**
* Update in memory shard state
*/
- class shardStateUpdator implements Runnable {
+ class ShardStateUpdator implements Runnable {
private final GatewayServiceEvent _event;
- private shardStateUpdator(GatewayServiceEvent event) {
+ private ShardStateUpdator(GatewayServiceEvent event) {
_event = event;
}
@@ -162,15 +179,19 @@ public class GatewayServiceManager {
}
}
+ public void stopManager() {
+ _connectionEventProcessor.shutdown();
+ _participantStateTransitionResultUpdator.shutdown();
+ _helixGatewayParticipantMap.clear();
+ }
+
public void startService() throws IOException {
_gatewayServiceChannel.start();
}
public void stopService() {
_gatewayServiceChannel.stop();
- _connectionEventProcessor.shutdown();
- _participantStateTransitionResultUpdator.shutdown();
- _helixGatewayParticipantMap.clear();
+ stopManager();
}
private void createHelixGatewayParticipant(String clusterName, String
instanceName,
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 69c9c219d..26de6db0f 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -30,7 +30,6 @@ import io.grpc.health.v1.HealthGrpc;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index f3221a12c..8253444a8 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -25,9 +25,12 @@ import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.base.HelixGatewayTestBase;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
@@ -40,7 +43,7 @@ public class TestGatewayServiceConnection extends
HelixGatewayTestBase {
CountDownLatch disconnectLatch = new CountDownLatch(1);
@Test
- public void TestLivenessDetection() throws IOException, InterruptedException
{
+ public void testLivenessDetection() throws IOException, InterruptedException
{
// start the gateway service
GatewayServiceChannelConfig config =
new
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build();
@@ -136,7 +139,10 @@ public class TestGatewayServiceConnection extends
HelixGatewayTestBase {
class DummyGatewayServiceManager extends GatewayServiceManager {
public DummyGatewayServiceManager(GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
- super("dummyZkAddress", gatewayServiceChannelConfig);
+ super("dummyZkAddress");
+ this.setGatewayServiceChannel(
+
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
+ this));
}
@Override