This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 957901c4c0b504c5855776bc46b952bfc72aea55 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Sun Sep 8 20:05:15 2024 -0700 Gateway - Implementing poll-mode channel (#2900) This pull request introduces significant enhancements to the Helix Gateway service, primarily focusing on implementing a poll-mode channel for communication. The main changes include: Addition of a new HelixGatewayServicePollModeChannel class that implements the poll-mode communication channel. Updates to GatewayServiceChannelConfig to support new configuration options for poll-mode. --- .github/workflows/Helix-PR-CI.yml | 2 +- helix-gateway/pom.xml | 2 +- .../api/constant/GatewayServiceConfigConstant.java | 1 + .../api/service/HelixGatewayServiceChannel.java | 4 +- .../channel/GatewayServiceChannelConfig.java | 139 +++++++++++++--- .../channel/HelixGatewayServiceChannelFactory.java | 2 +- .../channel/HelixGatewayServiceGrpcService.java | 7 +- .../HelixGatewayServicePollModeChannel.java | 181 ++++++++++++++++++++- .../participant/HelixGatewayParticipant.java | 4 +- .../gateway/service/GatewayServiceManager.java | 28 +++- .../gateway/util/GatewayCurrentStateCache.java | 14 +- .../apache/helix/gateway/util/PollChannelUtil.java | 131 +++++++++++++++ .../util/StateTransitionMessageTranslateUtil.java | 37 +++++ .../TestGatewayServiceChannelConfig.java} | 23 ++- .../TestHelixGatewayServicePollModeChannel.java | 90 ++++++++++ .../participant/TestHelixGatewayParticipant.java | 4 +- .../TestGatewayCurrentStateCache.java | 5 +- .../TestStateTransitionMessageTranslateUtil.java | 5 +- 18 files changed, 624 insertions(+), 55 deletions(-) diff --git a/.github/workflows/Helix-PR-CI.yml b/.github/workflows/Helix-PR-CI.yml index d86c8d8af..8ed1fb9ad 100644 --- a/.github/workflows/Helix-PR-CI.yml +++ b/.github/workflows/Helix-PR-CI.yml @@ -1,7 +1,7 @@ name: Helix PR CI on: pull_request: - branches: [ master, metaclient, ApplicationClusterManager] # TODO: remove side branch + branches: [ master, metaclient, ApplicationClusterManager, helix-gateway-service] # TODO: remove side branch paths-ignore: - '.github/**' - 'helix-front/**' diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml index 62e927d11..52c21d599 100644 --- a/helix-gateway/pom.xml +++ b/helix-gateway/pom.xml @@ -21,7 +21,7 @@ <parent> <groupId>org.apache.helix</groupId> <artifactId>helix</artifactId> - <version>1.4.1-SNAPSHOT</version> + <version>1.4.2-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java index 70e4d73e3..be5fa1a11 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/GatewayServiceConfigConstant.java @@ -24,4 +24,5 @@ public class GatewayServiceConfigConstant { public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60; public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60; public static final int DEFAULT_POLL_INTERVAL_SEC = 60; + public static final int DEFAULT_HEALTH_TIMEOUT_SEC = 60; } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java index 48e3e437d..10055834e 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java @@ -73,12 +73,12 @@ public interface HelixGatewayServiceChannel { * @param instanceName instance name * @param reason reason for closing connection */ - public void closeConnectionWithError(String instanceName, String reason); + public void closeConnectionWithError(String clusterName, String instanceName, String reason); /** * Gateway service close client connection with success. This function is called when manager wants to close client * connection gracefully, e.g., when gateway service is shutting down. * @param instanceName instance name */ - public void completeConnection(String instanceName); + public void completeConnection(String clusterName, String instanceName); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java index 31c7af05b..acdc405cc 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/GatewayServiceChannelConfig.java @@ -19,8 +19,11 @@ package org.apache.helix.gateway.channel; * under the License. */ +import java.util.Map; +import java.util.Properties; import static org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*; +import static org.apache.helix.gateway.channel.GatewayServiceChannelConfig.FileBasedConfigType.*; public class GatewayServiceChannelConfig { @@ -46,11 +49,11 @@ public class GatewayServiceChannelConfig { // service configs // service mode for inbound information. - private ChannelMode _channelMode; + private final ChannelMode _channelMode; // channel type for participant liveness detection - private ChannelType _participantConnectionChannelType; + private final ChannelType _participantConnectionChannelType; // channel for sending and receiving shard state transition request and shard state response - private ChannelType _shardStateChannelType; + private final ChannelType _shardStateChannelType; // grpc server configs private final int _grpcServerPort; @@ -61,15 +64,22 @@ public class GatewayServiceChannelConfig { // poll mode config private final int _pollIntervalSec; - // TODO: configs for pull mode grpc client - - // TODO: configs for pull mode with file + private final int _pollStartDelaySec; + private final int _pollHealthCheckTimeoutSec; + private final int _targetFileUpdateIntervalSec; + private final Map<String, Map<String, String>> _participantLivenessEndpointMap; + private final Properties _pollModeConfigs; + + public enum FileBasedConfigType { + PARTICIPANT_CURRENT_STATE_PATH, + SHARD_TARGET_STATE_PATH + } // getters - public ChannelMode getChannelMode() { return _channelMode; } + public ChannelType getParticipantConnectionChannelType() { return _participantConnectionChannelType; } @@ -102,9 +112,31 @@ public class GatewayServiceChannelConfig { return _pollIntervalSec; } - private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, ChannelType participantConnectionChannelType, - ChannelType shardStateChannelType, int serverHeartBeatInterval, int maxAllowedClientHeartBeatInterval, - int clientTimeout, boolean enableReflectionService, int pollIntervalSec) { + public Map<String, Map<String, String>> getParticipantLivenessEndpointMap() { + return _participantLivenessEndpointMap; + } + + public int getPollStartDelaySec() { + return _pollStartDelaySec; + } + + public int getPollHealthCheckTimeoutSec() { + return _pollHealthCheckTimeoutSec; + } + + public int getTargetFileUpdateIntervalSec() { + return _targetFileUpdateIntervalSec; + } + + public String getPollModeConfig(FileBasedConfigType type) { + return _pollModeConfigs.getProperty(type.toString()); + } + + private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, + ChannelType participantConnectionChannelType, ChannelType shardStateChannelType, int serverHeartBeatInterval, + int maxAllowedClientHeartBeatInterval, int clientTimeout, boolean enableReflectionService, int pollIntervalSec, + int pollStartDelaySec, int pollHealthCheckTimeoutSec, int targetFileUpdateIntervalSec, + Properties pollModeConfigs, Map<String, Map<String, String>> participantLivenessEndpointMap) { _grpcServerPort = grpcServerPort; _channelMode = channelMode; _participantConnectionChannelType = participantConnectionChannelType; @@ -114,6 +146,11 @@ public class GatewayServiceChannelConfig { _clientTimeout = clientTimeout; _enableReflectionService = enableReflectionService; _pollIntervalSec = pollIntervalSec; + _pollStartDelaySec = pollStartDelaySec; + _pollHealthCheckTimeoutSec = pollHealthCheckTimeoutSec; + _targetFileUpdateIntervalSec = targetFileUpdateIntervalSec; + _pollModeConfigs = pollModeConfigs; + _participantLivenessEndpointMap = participantLivenessEndpointMap; } public static class GatewayServiceProcessorConfigBuilder { @@ -132,10 +169,12 @@ public class GatewayServiceChannelConfig { // poll mode config private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC; - // poll mode grpc client configs - - // poll mode file configs - + // poll mode config + private Properties _pollModeConfigs; + private int _pollStartDelaySec = DEFAULT_POLL_INTERVAL_SEC; + private int _pollHealthCheckTimeoutSec = DEFAULT_HEALTH_TIMEOUT_SEC; + private int _targetFileUpdateIntervalSec = DEFAULT_POLL_INTERVAL_SEC; + private Map<String, Map<String, String>> _healthCheckEndpointMap; public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode channelMode) { _channelMode = channelMode; @@ -183,16 +222,69 @@ public class GatewayServiceChannelConfig { return this; } + public GatewayServiceProcessorConfigBuilder addPollModeConfig(FileBasedConfigType type, String value) { + if (_pollModeConfigs == null) { + _pollModeConfigs = new Properties(); + } + _pollModeConfigs.put(type.toString(), value); + return this; + } + + public GatewayServiceProcessorConfigBuilder setPollStartDelaySec(int pollStartDelaySec) { + _pollStartDelaySec = pollStartDelaySec; + return this; + } + + public GatewayServiceProcessorConfigBuilder setPollHealthCheckTimeout(int pollHealthCheckTimeout) { + _pollHealthCheckTimeoutSec = pollHealthCheckTimeout; + return this; + } + + public GatewayServiceProcessorConfigBuilder setTargetFileUpdateIntervalSec(int targetFileUpdateIntervalSec) { + _targetFileUpdateIntervalSec = targetFileUpdateIntervalSec; + return this; + } + + public GatewayServiceProcessorConfigBuilder setHealthCheckEndpointMap(Map<String, Map<String, String>> healthCheckEndpointMap) { + _healthCheckEndpointMap = healthCheckEndpointMap; + return this; + } + public void validate() { - if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER - && _shardStatenChannelType != ChannelType.GRPC_SERVER) || ( - _participantConnectionChannelType != ChannelType.GRPC_SERVER - && _shardStatenChannelType == ChannelType.GRPC_SERVER)) { - throw new IllegalArgumentException( - "In caas of GRPC server, Participant connection channel type and shard state channel type must be the same"); + switch (_participantConnectionChannelType) { + case GRPC_SERVER: + if (_grpcServerPort == 0) { + throw new IllegalArgumentException("Grpc server port must be set for grpc server channel type"); + } + if (_shardStatenChannelType != ChannelType.GRPC_SERVER) { + throw new IllegalArgumentException( + "In case of GRPC server, Participant connection channel type and shard state channel type must be the same"); + } + break; + case FILE: + if (_healthCheckEndpointMap == null || _healthCheckEndpointMap.isEmpty()) { + throw new IllegalArgumentException("Health check endpoint map must be set for file channel type"); + } + break; + default: + break; } - if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && _grpcServerPort == 0) { - throw new IllegalArgumentException("Grpc server port must be set for grpc server channel type"); + + switch (_shardStatenChannelType) { + case GRPC_SERVER: + if (_participantConnectionChannelType != ChannelType.GRPC_SERVER) { + throw new IllegalArgumentException( + "In case of GRPC server, Participant connection channel type and shard state channel type must be the same"); + } + break; + case FILE: + if (_pollModeConfigs == null || _pollModeConfigs.getProperty(SHARD_TARGET_STATE_PATH.name()) == null + || _pollModeConfigs.getProperty(SHARD_TARGET_STATE_PATH.name()).isEmpty()) { + throw new IllegalArgumentException("Current state and target state path must be set for file channel type"); + } + break; + default: + break; } } @@ -200,7 +292,8 @@ public class GatewayServiceChannelConfig { validate(); return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, _participantConnectionChannelType, _shardStatenChannelType, _serverHeartBeatInterval, _maxAllowedClientHeartBeatInterval, _clientTimeout, - _enableReflectionService, _pollIntervalSec); + _enableReflectionService, _pollIntervalSec, _pollStartDelaySec, _pollHealthCheckTimeoutSec, + _targetFileUpdateIntervalSec, _pollModeConfigs, _healthCheckEndpointMap); } } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java index fa665a5c8..796e84488 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java @@ -33,7 +33,7 @@ public class HelixGatewayServiceChannelFactory { return new HelixGatewayServiceGrpcService(manager, config); } } else { - return new HelixGatewayServicePollModeChannel(config); + return new HelixGatewayServicePollModeChannel(manager, config); } throw new IllegalArgumentException( "Unsupported channel mode and type combination: " + config.getChannelMode() + " , " diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index 8e9b0882b..9ed06ca23 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -49,7 +49,6 @@ import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMe */ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase implements HelixGatewayServiceChannel { - // create LOGGER private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class); // Map to store the observer for each instance @@ -118,7 +117,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli * The instance must already have established a connection to the gateway service. * * @param instanceName the instance name to send the message to - * @param message the message to convert to the transition message + * @param requests the state transition request to send */ @Override public void sendStateChangeRequests(String instanceName, ShardChangeRequests requests) { @@ -137,7 +136,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli * @param errorReason error reason for close */ @Override - public void closeConnectionWithError(String instanceName, String errorReason) { + public void closeConnectionWithError(String clusterName, String instanceName, String errorReason) { logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason); closeConnectionHelper(instanceName, errorReason, true); } @@ -147,7 +146,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli * @param instanceName instance name */ @Override - public void completeConnection(String instanceName) { + public void completeConnection(String clusterName, String instanceName) { logger.info("Complete connection for instance: {}", instanceName); closeConnectionHelper(instanceName, null, false); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java index 7623b8d2a..3acd9e83b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java @@ -20,38 +20,207 @@ package org.apache.helix.gateway.channel; */ import java.io.IOException; +import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -// TODO: implement this class +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.gateway.channel.GatewayServiceChannelConfig.FileBasedConfigType.*; +import static org.apache.helix.gateway.util.PollChannelUtil.*; + + +/** + * Helix Gateway Service Poll mode implementation. + * It periodically polls the current state of the participants and the liveness of the participants. + */ public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceChannel { + private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServicePollModeChannel.class); + final GatewayServiceManager _manager; + final GatewayServiceChannelConfig _config; + + // cluster -> file for user to report shards' current states + final String _userCurrentStateFilePath; + // cluster -> file path to store the shards' target states + final String _targetStateFilePath; + final GatewayServiceChannelConfig.ChannelType _participantConnectionStatusChannelType; + final GatewayServiceChannelConfig.ChannelType _shardStateChannelType; + + // cluster -> host -> liveness result + final Map<String, Map<String, Boolean>> _livenessResults; + // cluster -> host -> endpoint for query liveness + // It is the file pass if _participantConnectionStatusChannelType is FILE, grpc endpoint if it is GRPC_CLIENT + final Map<String, Map<String, String>> _livenessCheckEndpointMap; + + ScheduledExecutorService _scheduler; + + public HelixGatewayServicePollModeChannel(GatewayServiceManager manager, GatewayServiceChannelConfig config) { + _manager = manager; + _config = config; + _scheduler = Executors.newSingleThreadScheduledExecutor(); + _participantConnectionStatusChannelType = _config.getParticipantConnectionChannelType(); + _shardStateChannelType = _config.getShardStateChannelType(); + _livenessCheckEndpointMap = _config.getParticipantLivenessEndpointMap(); + _userCurrentStateFilePath = _config.getPollModeConfig(PARTICIPANT_CURRENT_STATE_PATH); + _targetStateFilePath = _config.getPollModeConfig(SHARD_TARGET_STATE_PATH); + _livenessResults = new HashMap<>(); + } - public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig config) { + /** + * Fetch the updates from the participants. + * 1. Get the diff of previous and current shard states, and send the state change event to the gateway manager. + * 2. Compare previous liveness and current liveness, and send the connection event to the gateway manager. + */ + protected void fetchUpdates() { + // 1. get the shard state change + Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates = + getChangedParticipantsCurrentState(_userCurrentStateFilePath); + + Map<String, Map<String, Map<String, Map<String, String>>>> currentStateDiff = new HashMap<>(); + for (String clusterName : currentShardStates.keySet()) { + Map<String, Map<String, Map<String, String>>> clusterDiffMap = + _manager.updateCacheWithNewCurrentStateAndGetDiff(clusterName, currentShardStates.get(clusterName)); + if (clusterDiffMap == null || clusterDiffMap.isEmpty()) { + continue; + } + for (String instanceName : clusterDiffMap.keySet()) { + // if the instance is previously connected, send state change event + if (_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName)) { + logger.info("Host {} has state change, sending event to gateway manager", instanceName); + pushClientEventToGatewayManager(_manager, + StateTransitionMessageTranslateUtil.translateCurrentStateChangeToEvent(clusterName, instanceName, + clusterDiffMap.get(instanceName))); + } + } + currentStateDiff.put(clusterName, clusterDiffMap); + } + + // 2. fetch host health + for (String clusterName : _livenessCheckEndpointMap.keySet()) { + for (String instanceName : _livenessCheckEndpointMap.get(clusterName).keySet()) { + boolean prevLiveness = + _livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName); + boolean liveness = fetchInstanceLivenessStatus(clusterName, instanceName); + + if (prevLiveness && !liveness) { // previously connected, now disconnected + logger.warn("Host {} is not healthy, sending event to gateway manager", instanceName); + pushClientEventToGatewayManager(_manager, + StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName)); + } else if (!prevLiveness && liveness) { // new connection. + logger.info("Host {} is newly connected, sending init connection event to gateway manager", instanceName); + pushClientEventToGatewayManager(_manager, + StateTransitionMessageTranslateUtil.translateCurrentStateDiffToInitConnectEvent(clusterName, instanceName, + currentStateDiff.containsKey(clusterName) ? currentStateDiff.get(clusterName).get(instanceName) + : new HashMap<>())); + } + _livenessResults.computeIfAbsent(clusterName, k -> new HashMap<>()).put(instanceName, liveness); + } + } } @Override public void sendStateChangeRequests(String instanceName, HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) { + switch (_shardStateChannelType) { + case FILE: + // we are periodically writing to the file, so no need to write here. + break; + default: + throw new NotImplementedException("Only support file based channel for now"); + } } @Override public void start() throws IOException { + logger.info("Starting Helix Gateway Service Poll Mode Channel..."); + final Runnable fetchUpdatesTask = new Runnable() { + @Override + public void run() { + fetchUpdates(); + } + }; + _scheduler.scheduleAtFixedRate(fetchUpdatesTask, _config.getPollStartDelaySec(), // init delay + _config.getPollIntervalSec(), // poll interval + TimeUnit.SECONDS); + scheduleTargetStateUpdateTask(); + } + void scheduleTargetStateUpdateTask() { + if (_shardStateChannelType == GatewayServiceChannelConfig.ChannelType.FILE) { + final Runnable writeTargetStateTask = new Runnable() { + @Override + public void run() { + flushAssignmentToFile(_manager.serializeTargetState(), _targetStateFilePath); + } + }; + _scheduler.scheduleAtFixedRate(writeTargetStateTask, _config.getPollStartDelaySec(), // init delay + _config.getTargetFileUpdateIntervalSec(), // poll interval + TimeUnit.SECONDS); + } } @Override public void stop() { - + logger.info("Stopping Helix Gateway Service Poll Mode Channel..."); + // Shutdown the scheduler gracefully when done (e.g., on app termination) + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + _scheduler.shutdown(); + try { + if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) { + _scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + _scheduler.shutdownNow(); + } + })); } @Override - public void closeConnectionWithError(String instanceName, String reason) { - + public void closeConnectionWithError(String clusterName, String instanceName, String reason) { + // nothing needed for filed based poll mode } @Override - public void completeConnection(String instanceName) { + public void completeConnection(String clusterName, String instanceName) { + // nothing needed for filed based poll mode + } + + /** + * Get current state of the participants. + * Now we only support file based, we will add GRPC based in the future. + */ + protected Map<String, Map<String, Map<String, Map<String, String>>>> getChangedParticipantsCurrentState( + String userCurrentStateFilePath) { + Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates; + switch (_shardStateChannelType) { + case FILE: + currentShardStates = readCurrentStateFromFile(userCurrentStateFilePath); + return currentShardStates; + default: + throw new NotImplementedException("Only support file based channel shard state for now"); + } + } + /** + * Fetch the liveness status of the instance. + * Now we only support file based, we will add GRPC based in the future. + */ + protected boolean fetchInstanceLivenessStatus(String clusterName, String instanceName) { + String endpoint = _livenessCheckEndpointMap.get(clusterName).get(instanceName); + switch (_participantConnectionStatusChannelType) { + case FILE: + return readInstanceLivenessStatusFromFile(endpoint, _config.getPollHealthCheckTimeoutSec()); + default: + throw new NotImplementedException("Only support grpc based channel for now"); + } } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index 15e080bb8..b17d897e1 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -184,7 +184,7 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { @Override public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception { _onDisconnectedCallback.run(); - _gatewayServiceChannel.closeConnectionWithError(_helixManager.getInstanceName(), + _gatewayServiceChannel.closeConnectionWithError(_helixManager.getClusterName(), _helixManager.getInstanceName(), error.getMessage()); } @@ -192,7 +192,7 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { if (_helixManager.isConnected()) { _helixManager.disconnect(); } - _gatewayServiceChannel.completeConnection(_helixManager.getInstanceName()); + _gatewayServiceChannel.completeConnection(_helixManager.getClusterName(), _helixManager.getInstanceName()); } public static class Builder { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 3edd4ee9d..221c2aeac 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,6 +19,8 @@ package org.apache.helix.gateway.service; * under the License. */ +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; @@ -27,7 +29,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.helix.common.caches.CurrentStateCache; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; @@ -45,6 +46,7 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor; * 4. For ST reply message, update the tracker */ public class GatewayServiceManager { + private static final ObjectMapper objectMapper = new ObjectMapper(); public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10; public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES = ImmutableSet.of("OnlineOffline"); @@ -89,6 +91,29 @@ public class GatewayServiceManager { } } + private GatewayCurrentStateCache getCache(String clusterName) { + return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName)); + } + + public void resetTargetStateCache(String clusterName, String instanceName) { + getCache(clusterName).resetTargetStateCache(instanceName); + } + + public Map<String, Map<String, Map<String, String>>> updateCacheWithNewCurrentStateAndGetDiff(String clusterName, + Map<String, Map<String, Map<String, String>>> newCurrentStateMap) { + return getCache(clusterName).updateCacheWithNewCurrentStateAndGetDiff(newCurrentStateMap); + } + + public String serializeTargetState() { + ObjectNode targetStateNode = new ObjectMapper().createObjectNode(); + for (String clusterName : _currentStateCacheMap.keySet()) { + // add the json node to the target state node + targetStateNode.set(clusterName, getCache(clusterName).serializeTargetAssignmentsToJSONNode()); + } + targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis())); + return targetStateNode.toString(); + } + /** * Update in memory shard state */ @@ -150,6 +175,7 @@ public class GatewayServiceManager { private void createHelixGatewayParticipant(String clusterName, String instanceName, Map<String, Map<String, String>> initialShardStateMap) { + resetTargetStateCache(clusterName, instanceName); // Create and add the participant to the participant map HelixGatewayParticipant.Builder participantBuilder = new HelixGatewayParticipant.Builder(_gatewayServiceChannel, instanceName, clusterName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index 503909d4a..a3ba7fbe7 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -64,8 +64,12 @@ public class GatewayCurrentStateCache { Map<String, Map<String, Map<String, String>>> diff = new HashMap<>(); for (String instance : newCurrentStateMap.keySet()) { Map<String, Map<String, String>> newCurrentState = newCurrentStateMap.get(instance); - diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) - .updateAndGetDiff(newCurrentState)); + Map<String, Map<String, String>> resourceStateDiff = + _currentStateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())) + .updateAndGetDiff(newCurrentState); + if (resourceStateDiff != null && !resourceStateDiff.isEmpty()) { + diff.put(instance, resourceStateDiff); + } } return diff; } @@ -95,7 +99,7 @@ public class GatewayCurrentStateCache { * Serialize the target state assignments to a JSON Node. * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} */ - public ObjectNode serializeTargetAssignmentsToJSON() { + public ObjectNode serializeTargetAssignmentsToJSONNode() { ObjectNode root = mapper.createObjectNode(); for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) { root.set(entry.getKey(), entry.getValue().toJSONNode()); @@ -111,6 +115,10 @@ public class GatewayCurrentStateCache { stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new HashMap<>())).updateWithDiff(diffMap); } + public void resetTargetStateCache(String instance) { + _targetStateMap.put(instance, new ShardStateMap(new HashMap<>())); + } + public static class ShardStateMap { Map<String, Map<String, String>> _stateMap; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java new file mode 100644 index 000000000..69c9c219d --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -0,0 +1,131 @@ +package org.apache.helix.gateway.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PollChannelUtil { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final Logger logger = LoggerFactory.getLogger(PollChannelUtil.class); + + // return <grpc health stub, ManagedChannel> pair + // ManagedChannel need to be shutdown when the connection is no longer needed + public static Pair<HealthGrpc.HealthBlockingStub, ManagedChannel> createGrpcChannel(String endpointPortString) { + String[] endpointPort = endpointPortString.split(":"); + ManagedChannel channel = + ManagedChannelBuilder.forAddress(endpointPort[0], Integer.parseInt(endpointPort[1])).usePlaintext().build(); + + return new ImmutablePair<>(HealthGrpc.newBlockingStub(channel), channel); + } + + /** + * Send Unary RPC to the gRPC service to check the health of the container. Could be liveness or readiness depends on input. + * @param service one of "readiness" or "liveness" + * https://github.com/kubernetes/kubernetes/issues/115651 + * @return + */ + public static boolean fetchLivenessStatusFromGrpcService(String service, HealthGrpc.HealthBlockingStub healthStub) { + HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(service).build(); + HealthCheckResponse response = healthStub.check(request); + return response.getStatus() == HealthCheckResponse.ServingStatus.SERVING; + } + + /** + * Flush the current assignment map to a file. The whole file is re-written every time. + */ + public static void flushAssignmentToFile(String targetAssignment, String filePath) { + try (FileWriter fileWriter = new FileWriter(filePath)) { + fileWriter.write(targetAssignment); + } catch (IOException e) { + logger.warn("Failed to write to file: " + filePath, e); + } + } + + /** + * read current state from a file, compare with in memory current state, update the in memory current state and return diff. + * Current state file format: {"cluster1" : { "instance_1" : { "resource1" : {"shard1” : “online" }}}} + */ + public static Map<String, Map<String, Map<String, Map<String, String>>>> readCurrentStateFromFile(String filePath) { + try { + // read from file path + File file = new File(filePath); + return objectMapper.readValue(file, + new TypeReference<Map<String, Map<String, Map<String, Map<String, String>>>>>() { + }); + } catch (IOException e) { + logger.warn("Failed to read from file: " + filePath); + return new HashMap<>(); + } + } + + /** + * Read instance liveness status from a file, return true if the instance is healthy and the last update time is within timeout. + * File format: {"IsAlive": true, "LastUpdateTime": 1629300000} + * @param filePath + * @param timeoutInSec + * @return + */ + public static boolean readInstanceLivenessStatusFromFile(String filePath, int timeoutInSec) { + try { + // read from file path + File file = new File(filePath); + HostLivenessState status = objectMapper.readValue(file, new TypeReference<HostLivenessState>() { + }); + return status.isHealthy() && (System.currentTimeMillis()/1000 - status.getLastUpdatedTime()) < timeoutInSec; + } catch (IOException e) { + logger.warn("Failed to read from file: " + filePath); + return false; + } + } + + /** + * Instance health status representation as JSON + */ + public static class HostLivenessState { + @JsonProperty ("IsAlive") + Boolean _isAlive; + @JsonProperty ("LastUpdateTime") + long _lastUpdatedTime; // in epoch second + + public Boolean isHealthy(){ + return _isAlive; + } + public long getLastUpdatedTime(){ + return _lastUpdatedTime; + } + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index c869c9c55..f9f2b4c3d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -126,4 +126,41 @@ public final class StateTransitionMessageTranslateUtil { clusterName).setParticipantName(instanceName); return builder.build(); } + + /** + * Translate from current state change to Helix Gateway Service event. + * @param instanceName + * @param clusterName + * @param shardStateMap + * @return + */ + public static GatewayServiceEvent translateCurrentStateChangeToEvent(String instanceName, String clusterName, + Map<String, Map<String, String>> shardStateMap) { + List<GatewayServiceEvent.StateTransitionResult> stResult = new ArrayList<>(); + shardStateMap.forEach((resourceName, value) -> value.forEach((key, value1) -> { + GatewayServiceEvent.StateTransitionResult result = + new GatewayServiceEvent.StateTransitionResult(resourceName, key, value1); + stResult.add(result); + })); + GatewayServiceEvent.GateWayServiceEventBuilder builder = + new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName( + clusterName).setParticipantName(instanceName).setStateTransitionStatusMap(stResult); + return builder.build(); + } + + /** + * Create a GatewayServiceEvent to notify the GatewayServiceManager to create a new HelixGatewayParticipant. + * @param instanceName the instance name of the newly connected participant + * @param clusterName the cluster nam + * @param shardStateMap the initial state of shards on the participant. Could be empty map + * @return + */ + public static GatewayServiceEvent translateCurrentStateDiffToInitConnectEvent(String instanceName, String clusterName, + Map<String, Map<String, String>> shardStateMap) { + GatewayServiceEvent.GateWayServiceEventBuilder builder = + new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName( + clusterName).setParticipantName(instanceName).setShardStateMap(shardStateMap); + return builder.build(); + } + } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java similarity index 81% rename from helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java rename to helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java index 30f6c9692..b0d2f0dc6 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/chanel/GatewayServiceChannelConfigTest.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestGatewayServiceChannelConfig.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.chanel; +package org.apache.helix.gateway.channel; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,12 +19,11 @@ package org.apache.helix.gateway.chanel; * under the License. */ -import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.testng.Assert; import org.testng.annotations.Test; -public class GatewayServiceChannelConfigTest { +public class TestGatewayServiceChannelConfig { @Test public void testGatewayServiceChannelConfigBuilder() { @@ -56,7 +55,7 @@ public class GatewayServiceChannelConfigTest { } @Test - public void testInvalidConfig() { + public void testInvalidConfigForGrpc() { GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder(); @@ -70,4 +69,20 @@ public class GatewayServiceChannelConfigTest { // expected } } + + @Test + public void testInvalidConfigForFile() { + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder(); + + builder.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE); + + // assert er get an exception + try { + builder.build(); + Assert.fail("Should have thrown an exception"); + } catch (IllegalArgumentException e) { + // expected + } + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java new file mode 100644 index 000000000..98941e94d --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/channel/TestHelixGatewayServicePollModeChannel.java @@ -0,0 +1,90 @@ +package org.apache.helix.gateway.channel; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.helix.gateway.api.constant.GatewayServiceEventType; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.junit.Assert; +import org.testng.annotations.Test; + +import static org.apache.helix.gateway.channel.GatewayServiceChannelConfig.ChannelMode.*; +import static org.mockito.Mockito.*; + + +public class TestHelixGatewayServicePollModeChannel { + private HelixGatewayServicePollModeChannel pollModeChannel; + private GatewayServiceManager manager; + private ScheduledExecutorService scheduler; + + int connectEventCount = 0; + int disconnectEventCount = 0; + int updateEventCount = 0; + + @Test + public void testFetchUpdates() { + scheduler = mock(ScheduledExecutorService.class); + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setChannelMode(POLL_MODE) + .setHealthCheckEndpointMap(Map.of("cluster1", Map.of("instance1", "endpoint1"))) + .setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE) + .setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.FILE) + .addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH, + "CurrentStatePath") + .addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH, + "shardTargetStatePath") + .setPollIntervalSec(60 * 1000) // set a larger number to avoid recurrent polling + .setPollStartDelaySec(60 * 1000) + .setTargetFileUpdateIntervalSec(60 * 1000); + + manager = new DummyGatewayServiceManager(builder.build()); + pollModeChannel = spy(new HelixGatewayServicePollModeChannel(manager, builder.build())); + pollModeChannel._scheduler = scheduler; // Inject the mocked scheduler + + // Mock the necessary methods and data + doReturn(true).when(pollModeChannel).fetchInstanceLivenessStatus("cluster1", "instance1"); + Map<String, Map<String, Map<String, Map<String, String>>>> currentStateMap = + Map.of("cluster1", Map.of("instance1", Map.of("resource1", Map.of("shard", "ONLINE")))); + doReturn(currentStateMap).when(pollModeChannel).getChangedParticipantsCurrentState(any()); + + // 1. Call fetch update for first time, verify we got a init connect event + pollModeChannel.fetchUpdates(); + + Assert.assertEquals(1, connectEventCount); + + // 2. Change currentStateMap, Call fetch update for second time, verify we got an update event + Map<String, Map<String, Map<String, Map<String, String>>>> currentStateMap2 = + Map.of("cluster1", Map.of("instance1", Map.of("resource1", Map.of("shard", "OFFLINE")))); + doReturn(currentStateMap2).when(pollModeChannel).getChangedParticipantsCurrentState(any()); + pollModeChannel.fetchUpdates(); + Assert.assertEquals(1, updateEventCount); + + // call pne more time with same shard state, verify no new event + pollModeChannel.fetchUpdates(); + Assert.assertEquals(1, updateEventCount); + + // 3. Change health check result, Call fetch update for third time, verify we got a disconnect event + doReturn(false).when(pollModeChannel).fetchInstanceLivenessStatus("cluster1", "instance1"); + pollModeChannel.fetchUpdates(); + Assert.assertEquals(1, disconnectEventCount); + } + + class DummyGatewayServiceManager extends GatewayServiceManager { + + public DummyGatewayServiceManager(GatewayServiceChannelConfig gatewayServiceChannelConfig) { + super("dummyZkAddress", gatewayServiceChannelConfig); + } + + @Override + public void onGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.CONNECT)) { + connectEventCount++; + } else if (event.getEventType().equals(GatewayServiceEventType.DISCONNECT)) { + disconnectEventCount++; + } else if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + updateEventCount++; + } + System.out.println("Received event: " + event.getEventType()); + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index 69882fc66..75e9fc581 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -372,11 +372,11 @@ public class TestHelixGatewayParticipant extends ZkTestBase { } @Override - public void closeConnectionWithError(String instanceName, String reason) { + public void closeConnectionWithError(String clusterName, String instanceName, String reason) { _errorDisconnectCount.incrementAndGet(); } @Override - public void completeConnection(String instanceName) { + public void completeConnection(String clusterName, String instanceName) { _gracefulDisconnectCount.incrementAndGet(); } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java similarity index 92% rename from helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java rename to helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java index 448e0b3e9..99fec4a25 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestGatewayCurrentStateCache.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.utils; +package org.apache.helix.gateway.util; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,7 +21,6 @@ package org.apache.helix.gateway.utils; import java.util.HashMap; import java.util.Map; -import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -77,6 +76,6 @@ public class TestGatewayCurrentStateCache { cache.updateTargetStateWithDiff("instance1", targetStateChange); Assert.assertEquals(cache.getTargetState("instance1", "resource1", "shard1"), "OFFLINE"); - Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}"); + Assert.assertEquals(cache.serializeTargetAssignmentsToJSONNode().toString(), "{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}"); } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java similarity index 96% rename from helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java rename to helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java index 65c299122..d65031a5f 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/util/TestStateTransitionMessageTranslateUtil.java @@ -1,4 +1,6 @@ -package org.apache.helix.gateway.utils;/* +package org.apache.helix.gateway.util; + +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +22,6 @@ package org.apache.helix.gateway.utils;/* import org.apache.helix.HelixDefinedState; import org.apache.helix.gateway.participant.HelixGatewayParticipant; -import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; import org.testng.Assert; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;