This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0efb309e8fec43299ac44f04f108d08bdff6b5f3 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Wed Jul 31 16:53:52 2024 -0700 API to close grpc client stream connection from server side (#2856) API to close grpc client stream connection from server side --- .../org/apache/helix/gateway/HelixGatewayMain.java | 21 ++- .../api/service/HelixGatewayServiceProcessor.java | 13 ++ .../constant/GatewayServiceGrpcDefaultConfig.java | 7 + .../HelixGatewayServiceGrpcService.java | 43 ++++++ .../gateway/service/GatewayServiceManager.java | 12 +- .../util/HelixGatewayGrpcServerBuilder.java | 99 ++++++++++++++ .../java/org/apache/helix/gateway/DummyTest.java | 21 --- .../helix/gateway/TestPerKeyBlockingExecutor.java | 2 +- .../helix/gateway/TestPerKeyLockRegistry.java | 2 +- .../helix/gateway/base/HelixGatewayTestBase.java | 1 - .../helix/gateway/base/ZookeeperTestBase.java | 3 +- .../helix/gateway/base/manager/ClusterManager.java | 4 - .../base/manager/MockParticipantManager.java | 1 - .../apache/helix/gateway/base/util/TestHelper.java | 148 ++++++++++---------- .../participant/TestHelixGatewayParticipant.java | 10 ++ .../service/TestGatewayServiceConnection.java | 150 +++++++++++++++++++++ .../gateway/service/TestGatewayServiceManager.java | 2 +- 17 files changed, 424 insertions(+), 115 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java index a9245f157..bcf502e90 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java @@ -19,6 +19,14 @@ package org.apache.helix.gateway; * under the License. */ +import io.grpc.Server; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; + + /** * Main class for Helix Gateway. * It starts the Helix Gateway grpc service. @@ -28,7 +36,18 @@ public final class HelixGatewayMain { private HelixGatewayMain() { } - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, IOException { + // Create a new server to listen on port 50051 + GatewayServiceManager manager = new GatewayServiceManager(); + Server server = new HelixGatewayGrpcServerBuilder().setPort(50051) + .setGrpcService((HelixGatewayServiceGrpcService)manager.getHelixGatewayServiceProcessor()) + .build(); + server.start(); + System.out.println("Server started, listening on " + server.getPort()); + + // Wait for the server to shutdown + server.awaitTermination(365, TimeUnit.DAYS); } } + diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java index fe57e69c9..c06443802 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java @@ -37,4 +37,17 @@ public interface HelixGatewayServiceProcessor { void sendStateTransitionMessage(String instanceName, String currentState, Message message); + /** + * Close connection with error. + * @param instanceName instance name + * @param reason reason for closing connection + */ + public void closeConnectionWithError(String instanceName, String reason); + + /** + * Close connection with success. + * @param instanceName instance name + */ + public void completeConnection(String instanceName); + } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java new file mode 100644 index 000000000..3f1bbafbc --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceGrpcDefaultConfig.java @@ -0,0 +1,7 @@ +package org.apache.helix.gateway.constant; + +public class GatewayServiceGrpcDefaultConfig { + public static final int DEFAULT_SERVER_HEARTBEAT_INTERVAL = 60; + public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60; + public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60; +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java index 018d6591e..344a2649d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.grpcservice; * under the License. */ +import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.HashMap; import java.util.Map; @@ -30,6 +31,8 @@ import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; import org.apache.helix.gateway.util.PerKeyLockRegistry; import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; import org.apache.helix.model.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; @@ -41,6 +44,8 @@ import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMe */ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase implements HelixGatewayServiceProcessor { + // create LOGGER + private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class); // Map to store the observer for each instance private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); @@ -72,6 +77,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli @Override public void onNext(ShardStateMessage request) { + logger.info("Receive message from instance: {}", request.toString()); if (request.hasShardState()) { ShardState shardState = request.getShardState(); updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); @@ -81,11 +87,13 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli @Override public void onError(Throwable t) { + logger.info("Receive on error message: {}", t.getMessage()); onClientClose(responseObserver); } @Override public void onCompleted() { + logger.info("Receive on complete message"); onClientClose(responseObserver); } }; @@ -110,6 +118,40 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } } + /** + * Close the connection of the instance. If closed because of error, use the error reason to close the connection. + * @param instanceName instance name + * @param errorReason error reason for close + */ + @Override + public void closeConnectionWithError(String instanceName, String errorReason) { + logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason); + closeConnectionHelper(instanceName, errorReason, true); + } + + /** + * Complete the connection of the instance. + * @param instanceName instance name + */ + @Override + public void completeConnection(String instanceName) { + logger.info("Complete connection for instance: {}", instanceName); + closeConnectionHelper(instanceName, null, false); + } + + + private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) { + StreamObserver<TransitionMessage> observer; + observer = _observerMap.get(instanceName); + if (observer != null) { + if (withError) { + observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException()); + } else { + observer.onCompleted(); + } + } + } + private void updateObserver(String instanceName, String clusterName, StreamObserver<TransitionMessage> streamObserver) { _lockRegistry.withLock(instanceName, () -> { @@ -125,6 +167,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); clusterName = instanceInfo.getRight(); instanceName = instanceInfo.getLeft(); + logger.info("Client close connection for instance: {}", instanceName); if (instanceName == null || clusterName == null) { // TODO: log error; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 85a274156..4553c04ca 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -75,7 +75,7 @@ public class GatewayServiceManager { if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); } else { - _connectionEventProcessor.offerEvent(event.getInstanceName(), new participantConnectionProcessor(event)); + _connectionEventProcessor.offerEvent(event.getInstanceName(), new ParticipantConnectionProcessor(event)); } } @@ -109,10 +109,10 @@ public class GatewayServiceManager { * Create HelixGatewayService instance and register it to the manager. * It includes waiting for ZK connection, and also wait for previous LiveInstance to expire. */ - class participantConnectionProcessor implements Runnable { - private final GatewayServiceEvent _event; + class ParticipantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; - private participantConnectionProcessor(GatewayServiceEvent event) { + public ParticipantConnectionProcessor(GatewayServiceEvent event) { _event = event; } @@ -127,6 +127,10 @@ public class GatewayServiceManager { } } + public HelixGatewayServiceProcessor getHelixGatewayServiceProcessor() { + return _gatewayServiceProcessor; + } + private void createHelixGatewayParticipant(String clusterName, String instanceName, Map<String, Map<String, String>> initialShardStateMap) { // Create and add the participant to the participant map diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java new file mode 100644 index 000000000..afa1477ce --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/HelixGatewayGrpcServerBuilder.java @@ -0,0 +1,99 @@ +package org.apache.helix.gateway.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.protobuf.services.ProtoReflectionService; +import java.util.concurrent.TimeUnit; + +import static org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*; + + +/** + * Builder class to create a Helix gateway service server with custom configurations. + */ + public class HelixGatewayGrpcServerBuilder { + private int port; + private BindableService service; + private int serverHeartBeatInterval = DEFAULT_SERVER_HEARTBEAT_INTERVAL; + private int maxAllowedClientHeartBeatInterval = DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL; + private int clientTimeout = DEFAULT_CLIENT_TIMEOUT; + private boolean enableReflectionService = true; + + public HelixGatewayGrpcServerBuilder setPort(int port) { + this.port = port; + return this; + } + + public HelixGatewayGrpcServerBuilder setServerHeartBeatInterval(int serverHeartBeatInterval) { + this.serverHeartBeatInterval = serverHeartBeatInterval; + return this; + } + + public HelixGatewayGrpcServerBuilder setMaxAllowedClientHeartBeatInterval(int maxAllowedClientHeartBeatInterval) { + this.maxAllowedClientHeartBeatInterval = maxAllowedClientHeartBeatInterval; + return this; + } + + public HelixGatewayGrpcServerBuilder setClientTimeout(int clientTimeout) { + this.clientTimeout = clientTimeout; + return this; + } + + public HelixGatewayGrpcServerBuilder setGrpcService(BindableService service) { + this.service = service; + return this; + } + + public HelixGatewayGrpcServerBuilder enableReflectionService(boolean enableReflectionService) { + this.enableReflectionService = enableReflectionService; + return this; + } + + public Server build() { + validate(); + + ServerBuilder serverBuilder = ServerBuilder.forPort(port) + .addService(service) + .keepAliveTime(serverHeartBeatInterval, TimeUnit.SECONDS) // HeartBeat time + .keepAliveTimeout(clientTimeout, TimeUnit.SECONDS) // KeepAlive client timeout + .permitKeepAliveTime(maxAllowedClientHeartBeatInterval, TimeUnit.SECONDS) // Permit min HeartBeat time + .permitKeepAliveWithoutCalls(true); // Allow KeepAlive forever without active RPCs + + if (enableReflectionService) { + serverBuilder.addService(ProtoReflectionService.newInstance()); + } + + return serverBuilder + .build(); + } + + private void validate() { + if (port == 0 || service == null) { + throw new IllegalArgumentException("Port and service must be set"); + } + if (clientTimeout < maxAllowedClientHeartBeatInterval) { + throw new IllegalArgumentException("Client timeout is less than max allowed client heartbeat interval"); + } + } + } + diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java deleted file mode 100644 index 7511cb3a3..000000000 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.apache.helix.gateway; - -import org.apache.helix.gateway.base.HelixGatewayTestBase; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class DummyTest extends HelixGatewayTestBase { - - @BeforeClass - public void beforeClass() { - _numParticipants = 5; - super.beforeClass(); - } - - @Test - public void testSetups() { - createResource("TestDB_1", 4, 2); - Assert.assertTrue(_clusterVerifier.verify()); - } -} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java index 457002cbd..c58a5bfa7 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java @@ -3,8 +3,8 @@ package org.apache.helix.gateway; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; -import org.testng.annotations.Test; import org.testng.Assert; +import org.testng.annotations.Test; public class TestPerKeyBlockingExecutor { diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java index 89d2e0bc1..3b90dcb10 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java @@ -5,8 +5,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.apache.helix.gateway.util.PerKeyLockRegistry; -import org.testng.annotations.Test; import org.testng.Assert; +import org.testng.annotations.Test; public class TestPerKeyLockRegistry { diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java index 06cf45cd1..5ee11e42b 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java @@ -1,7 +1,6 @@ package org.apache.helix.gateway.base; import java.util.List; - import org.apache.helix.ConfigAccessor; import org.apache.helix.gateway.base.manager.ClusterControllerManager; import org.apache.helix.gateway.base.manager.MockParticipantManager; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java index 8d87bc4f3..ac09eb6ec 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java @@ -1,5 +1,6 @@ package org.apache.helix.gateway.base; +import com.google.common.base.Preconditions; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.reflect.Method; @@ -12,8 +13,6 @@ import java.util.Set; import java.util.logging.Level; import javax.management.MBeanServerConnection; import javax.management.ObjectName; - -import com.google.common.base.Preconditions; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java index 3858513e5..2d0070ed0 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java @@ -19,16 +19,12 @@ package org.apache.helix.gateway.base.manager; * under the License. */ -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; - import org.apache.helix.HelixManagerProperty; import org.apache.helix.InstanceType; -import org.apache.helix.manager.zk.CallbackHandler; import org.apache.helix.manager.zk.HelixManagerStateListener; import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java index 34047c525..625eb7452 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java @@ -20,7 +20,6 @@ package org.apache.helix.gateway.base.manager; */ import java.util.concurrent.CountDownLatch; - import org.apache.helix.HelixCloudProperty; import org.apache.helix.HelixManagerProperty; import org.apache.helix.HelixPropertyFactory; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java index 681f39205..a5fbcf39c 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.base.util; * under the License. */ +import io.grpc.Server; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; @@ -37,13 +38,15 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import org.apache.commons.io.FileUtils; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; @@ -71,10 +74,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import static org.apache.helix.gateway.constant.GatewayServiceGrpcDefaultConfig.*; + + public class TestHelper { private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500; + /** * Returns a unused random port. */ @@ -92,22 +99,19 @@ public class TestHelper { return TestHelper.startZkServer(zkAddress, empty, true); } - static public ZkServer startZkServer(final String zkAddress, final String rootNamespace) - throws Exception { + static public ZkServer startZkServer(final String zkAddress, final String rootNamespace) throws Exception { List<String> rootNamespaces = new ArrayList<String>(); rootNamespaces.add(rootNamespace); return TestHelper.startZkServer(zkAddress, rootNamespaces, true); } - static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces) - throws Exception { + static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces) throws Exception { return startZkServer(zkAddress, rootNamespaces, true); } - static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces, - boolean overwrite) throws Exception { - System.out.println( - "Start zookeeper at " + zkAddress + " in thread " + Thread.currentThread().getName()); + static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces, boolean overwrite) + throws Exception { + System.out.println("Start zookeeper at " + zkAddress + " in thread " + Thread.currentThread().getName()); String zkDir = zkAddress.replace(':', '_'); final String logDir = "/tmp/" + zkDir + "/logs"; @@ -146,8 +150,7 @@ public class TestHelper { if (zkServer != null) { zkServer.shutdown(); System.out.println( - "Shut down zookeeper at port " + zkServer.getPort() + " in thread " + Thread - .currentThread().getName()); + "Shut down zookeeper at port " + zkServer.getPort() + " in thread " + Thread.currentThread().getName()); } } @@ -190,8 +193,7 @@ public class TestHelper { // debug // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify (" // + result + ")"); - System.err.println( - verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" + result + ")"); + System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" + result + ")"); LOG.debug("args:" + Arrays.toString(args)); // System.err.println("args:" + Arrays.toString(args)); @@ -218,21 +220,19 @@ public class TestHelper { public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName, Set<String> instanceNames, String zkAddr) { - HelixZkClient zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + HelixZkClient zkClient = + SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); zkClient.setZkSerializer(new ZNRecordSerializer()); try { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); Builder keyBuilder = accessor.keyBuilder(); for (String instanceName : instanceNames) { List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(instanceName)); for (String sessionId : sessionIds) { - CurrentState curState = - accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + CurrentState curState = accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); if (curState != null && curState.getRecord().getMapFields().size() != 0) { return false; @@ -263,20 +263,18 @@ public class TestHelper { return !manager.isConnected(); } - public static void setupCluster(String clusterName, String zkAddr, int startPort, - String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, - int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception { - TestHelper - .setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, resourceNamePrefix, - resourceNb, partitionNb, nodesNb, replica, stateModelDef, RebalanceMode.SEMI_AUTO, - doRebalance); + public static void setupCluster(String clusterName, String zkAddr, int startPort, String participantNamePrefix, + String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, int replica, String stateModelDef, + boolean doRebalance) throws Exception { + TestHelper.setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, resourceNamePrefix, resourceNb, + partitionNb, nodesNb, replica, stateModelDef, RebalanceMode.SEMI_AUTO, doRebalance); } - public static void setupCluster(String clusterName, String zkAddr, int startPort, - String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, - int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance) { - HelixZkClient zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + public static void setupCluster(String clusterName, String zkAddr, int startPort, String participantNamePrefix, + String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, int replica, String stateModelDef, + RebalanceMode mode, boolean doRebalance) { + HelixZkClient zkClient = + SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); try { if (zkClient.exists("/" + clusterName)) { LOG.warn("Cluster already exists:" + clusterName + ". Deleting it"); @@ -293,8 +291,7 @@ public class TestHelper { for (int i = 0; i < resourceNb; i++) { String resourceName = resourceNamePrefix + i; - setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, - mode.name(), + setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, mode.name(), mode == RebalanceMode.FULL_AUTO ? CrushEdRebalanceStrategy.class.getName() : RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY); if (doRebalance) { @@ -334,15 +331,13 @@ public class TestHelper { * @param state * : MASTER|SLAVE|ERROR... */ - public static void verifyState(String clusterName, String zkAddr, - Map<String, Set<String>> stateMap, String state) { - HelixZkClient zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + public static void verifyState(String clusterName, String zkAddr, Map<String, Set<String>> stateMap, String state) { + HelixZkClient zkClient = + SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); zkClient.setZkSerializer(new ZNRecordSerializer()); try { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); Builder keyBuilder = accessor.keyBuilder(); for (String resGroupPartitionKey : stateMap.keySet()) { @@ -354,12 +349,12 @@ public class TestHelper { for (String instance : stateMap.get(resGroupPartitionKey)) { String actualState = extView.getStateMap(partitionKey).get(instance); Assert.assertNotNull(actualState, - "externalView doesn't contain state for " + resGroup + "/" + partitionKey + " on " - + instance + " (expect " + state + ")"); + "externalView doesn't contain state for " + resGroup + "/" + partitionKey + " on " + instance + + " (expect " + state + ")"); Assert.assertEquals(actualState, state, - "externalView for " + resGroup + "/" + partitionKey + " on " + instance + " is " - + actualState + " (expect " + state + ")"); + "externalView for " + resGroup + "/" + partitionKey + " on " + instance + " is " + actualState + + " (expect " + state + ")"); } } } finally { @@ -391,8 +386,8 @@ public class TestHelper { return retMap; } - public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads, - final Callable<T> method, final long timeout) { + public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads, final Callable<T> method, + final long timeout) { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch finishCounter = new CountDownLatch(nrThreads); final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>(); @@ -445,8 +440,8 @@ public class TestHelper { return resultsMap; } - public static Message createMessage(String msgId, String fromState, String toState, - String tgtName, String resourceName, String partitionName) { + public static Message createMessage(String msgId, String fromState, String toState, String tgtName, + String resourceName, String partitionName) { Message msg = new Message(MessageType.STATE_TRANSITION, msgId); msg.setFromState(fromState); msg.setToState(toState); @@ -469,8 +464,7 @@ public class TestHelper { return fullClassName.substring(fullClassName.lastIndexOf('.') + 1); } - public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods, - final long timeout) { + public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods, final long timeout) { final int nrThreads = methods.size(); final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch finishCounter = new CountDownLatch(nrThreads); @@ -536,8 +530,7 @@ public class TestHelper { TreeSet<String> childSet = new TreeSet<String>(); childSet.addAll(node.getChildSet()); System.out.print( - key + "=" + node.getData() + ", " + childSet + ", " + (node.getStat() == null ? "null\n" - : node.getStat())); + key + "=" + node.getData() + ", " + childSet + ", " + (node.getStat() == null ? "null\n" : node.getStat())); } System.out.println("END:Print cache"); } @@ -560,8 +553,7 @@ public class TestHelper { } } - public static void readZkRecursive(String path, Map<String, ZNode> map, - BaseDataAccessor<ZNRecord> zkAccessor) { + public static void readZkRecursive(String path, Map<String, ZNode> map, BaseDataAccessor<ZNRecord> zkAccessor) { try { Stat stat = new Stat(); ZNRecord record = zkAccessor.get(path, stat, 0); @@ -582,8 +574,8 @@ public class TestHelper { } } - public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor, - HelixZkClient zkclient, boolean needVerifyStat) { + public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor, HelixZkClient zkclient, + boolean needVerifyStat) { // read everything Map<String, ZNode> zkMap = new HashMap<String, ZNode>(); Map<String, ZNode> cache = new HashMap<String, ZNode>(); @@ -596,13 +588,13 @@ public class TestHelper { return verifyZkCache(paths, null, cache, zkMap, needVerifyStat); } - public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache, - HelixZkClient zkclient, boolean needVerifyStat) { + public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache, HelixZkClient zkclient, + boolean needVerifyStat) { return verifyZkCache(paths, null, cache, zkclient, needVerifyStat); } - public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, - Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) { + public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, Map<String, ZNode> cache, + HelixZkClient zkclient, boolean needVerifyStat) { // read everything on zk under paths Map<String, ZNode> zkMap = new HashMap<String, ZNode>(); for (String path : paths) { @@ -613,12 +605,11 @@ public class TestHelper { return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat); } - public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, - Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean needVerifyStat) { + public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, Map<String, ZNode> cache, + Map<String, ZNode> zkMap, boolean needVerifyStat) { // equal size if (zkMap.size() != cache.size()) { - System.err - .println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: " + zkMap.size()); + System.err.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: " + zkMap.size()); System.out.println("cache: (" + cache.size() + ")"); TestHelper.printCache(cache); @@ -640,23 +631,20 @@ public class TestHelper { } if ((zkNode.getData() == null && cacheNode.getData() != null) || (zkNode.getData() != null - && cacheNode.getData() == null) || (zkNode.getData() != null - && cacheNode.getData() != null && !zkNode.getData().equals(cacheNode.getData()))) { + && cacheNode.getData() == null) || (zkNode.getData() != null && cacheNode.getData() != null + && !zkNode.getData().equals(cacheNode.getData()))) { // data not equal System.err.println( - "data mismatch on path: " + path + ", inCache: " + cacheNode.getData() + ", onZk: " - + zkNode.getData()); + "data mismatch on path: " + path + ", inCache: " + cacheNode.getData() + ", onZk: " + zkNode.getData()); return false; } - if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || ( - zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || ( - zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet() - .equals(cacheNode.getChildSet()))) { + if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || (zkNode.getChildSet() != null + && cacheNode.getChildSet() == null) || (zkNode.getChildSet() != null && cacheNode.getChildSet() != null + && !zkNode.getChildSet().equals(cacheNode.getChildSet()))) { // childSet not equal - System.err.println( - "childSet mismatch on path: " + path + ", inCache: " + cacheNode.getChildSet() - + ", onZk: " + zkNode.getChildSet()); + System.err.println("childSet mismatch on path: " + path + ", inCache: " + cacheNode.getChildSet() + ", onZk: " + + zkNode.getChildSet()); return false; } @@ -664,8 +652,7 @@ public class TestHelper { if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat())) { // stat not equal System.err.println( - "Stat mismatch on path: " + path + ", inCache: " + cacheNode.getStat() + ", onZk: " - + zkNode.getStat()); + "Stat mismatch on path: " + path + ", inCache: " + cacheNode.getStat() + ", onZk: " + zkNode.getStat()); return false; } } @@ -684,8 +671,7 @@ public class TestHelper { statePriorityList.add("IDLE"); statePriorityList.add("DROPPED"); statePriorityList.add("ERROR"); - record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), - statePriorityList); + record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), statePriorityList); for (String state : statePriorityList) { String key = state + ".meta"; Map<String, String> metadata = new HashMap<String, String>(); @@ -814,4 +800,10 @@ public class TestHelper { Thread.sleep(50); } while (true); } + + public static Server createHelixGatewayServer(int port, GatewayServiceManager manager) { + return new HelixGatewayGrpcServerBuilder().setPort(port) + .setGrpcService((HelixGatewayServiceGrpcService) manager.getHelixGatewayServiceProcessor()) + .build(); + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index fda7fbb1b..dd22b9fa0 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -329,5 +329,15 @@ public class TestHelixGatewayParticipant extends ZkTestBase { Message message) { _pendingMessageMap.put(instanceName, message); } + + @Override + public void closeConnectionWithError(String instanceName, String reason) { + + } + + @Override + public void completeConnection(String instanceName) { + + } } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java new file mode 100644 index 000000000..dac903b28 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java @@ -0,0 +1,150 @@ +package org.apache.helix.gateway.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.helix.gateway.base.HelixGatewayTestBase; +import org.apache.helix.gateway.base.util.TestHelper; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.util.HelixGatewayGrpcServerBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; +import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; + + + +public class TestGatewayServiceConnection extends HelixGatewayTestBase { + CountDownLatch connectLatch = new CountDownLatch(1); + CountDownLatch disconnectLatch = new CountDownLatch(1); + + @Test + public void TestLivenessDetection() throws IOException, InterruptedException { + // start the gateway service + Server server = TestHelper.createHelixGatewayServer(50051, new DummyGatewayServiceManager()); + server.start(); + + // start the client + HelixGatewayClient client = new HelixGatewayClient("localhost", 50051); + client.connect(); + // assert we get connect event + Assert.assertTrue(connectLatch.await(5, TimeUnit.SECONDS)); + + // assert we have disconnect event + client.shutdownGracefully(); + Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS)); + + connectLatch = new CountDownLatch(1); + disconnectLatch = new CountDownLatch(1); + + // start the client + HelixGatewayClient client2 = new HelixGatewayClient("localhost", 50051); + client2.connect(); + // assert we get connect event + Assert.assertTrue(connectLatch.await(5, TimeUnit.SECONDS)); + + // assert we have disconnect event when shutting down ungracefully + client2.shutdownByClosingConnection(); + Assert.assertTrue(disconnectLatch.await(5, TimeUnit.SECONDS)); + } + + public class HelixGatewayClient { + + private final ManagedChannel channel; + private final HelixGatewayServiceGrpc.HelixGatewayServiceStub asyncStub; + + private StreamObserver _requestObserver; + + public HelixGatewayClient(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .keepAliveTime(30, TimeUnit.SECONDS) // KeepAlive time + .keepAliveTimeout(3, TimeUnit.MINUTES) // KeepAlive timeout + .keepAliveWithoutCalls(true) // Allow KeepAlive without active RPCs + .build(); + this.asyncStub = HelixGatewayServiceGrpc.newStub(channel); + } + + public void connect() { + _requestObserver = asyncStub.report(new StreamObserver<HelixGatewayServiceOuterClass.TransitionMessage>() { + @Override + public void onNext(HelixGatewayServiceOuterClass.TransitionMessage value) { + // Handle response from server + } + + @Override + public void onError(Throwable t) { + // Handle error + } + + @Override + public void onCompleted() { + // Handle stream completion + } + }); + + // Send initial ShardStateMessage + HelixGatewayServiceOuterClass.ShardStateMessage initialMessage = + HelixGatewayServiceOuterClass.ShardStateMessage.newBuilder() + .setShardState(HelixGatewayServiceOuterClass.ShardState.newBuilder() + .setInstanceName("instance1") + .setClusterName("TEST_CLUSTER") + .build()) + .build(); + _requestObserver.onNext(initialMessage); + + // Add more logic to send additional messages if needed + } + + public void shutdownGracefully() { + // graceful shutdown + _requestObserver.onCompleted(); + channel.shutdown().shutdownNow(); + } + + public void shutdownByClosingConnection() { + //ungraceful shutdown + channel.shutdown().shutdownNow(); + } + } + + class DummyGatewayServiceManager extends GatewayServiceManager { + + public DummyGatewayServiceManager() { + super(); + } + + @Override + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) { + connectLatch.countDown(); + } else if (event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) { + disconnectLatch.countDown(); + } + System.out.println("Received event: " + event.getEventType()); + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java index a345f008e..788bf4472 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -4,7 +4,7 @@ import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*;