This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this push: new f602ca21f Add getter for all target state - gateway service (#2943) f602ca21f is described below commit f602ca21f43004f7b4ebd4febfc20c7eca6bf0ee Author: xyuanlu <xyua...@gmail.com> AuthorDate: Wed Oct 9 17:26:21 2024 -0700 Add getter for all target state - gateway service (#2943) Add getter for all target state - gateway service --- .../constant/gatewayServiceManagerConstant.java | 6 +++++ .../gateway/service/GatewayServiceManager.java | 19 +++++++++++--- .../gateway/util/GatewayCurrentStateCache.java | 11 +++++++- .../apache/helix/gateway/util/PollChannelUtil.java | 16 ++++++++++++ .../integration/TestFilePullChannelE2E.java | 2 +- .../gateway/service/TestGatewayServiceManager.java | 29 ++++++++++++++++++++++ 6 files changed, 78 insertions(+), 5 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java new file mode 100644 index 000000000..c4fc420e1 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java @@ -0,0 +1,6 @@ +package org.apache.helix.gateway.api.constant; + +public class gatewayServiceManagerConstant { + public static final String TARGET_STATE_ASSIGNMENT_KEY_NAME = "Assignment"; + public static final String TIMESTAMP_KEY = "Timestamp"; +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 5e53833dc..0809ac6e2 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,6 +19,7 @@ package org.apache.helix.gateway.service; * under the License. */ +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableSet; @@ -36,6 +37,11 @@ import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.GatewayCurrentStateCache; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; +import org.apache.helix.gateway.util.PollChannelUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.gateway.api.constant.gatewayServiceManagerConstant.*; /** @@ -46,6 +52,7 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor; * 4. For ST reply message, update the tracker */ public class GatewayServiceManager { + private static final Logger logger = LoggerFactory.getLogger(GatewayServiceManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10; public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES = @@ -128,12 +135,14 @@ public class GatewayServiceManager { public synchronized String serializeTargetState() { ObjectNode targetStateNode = new ObjectMapper().createObjectNode(); + ObjectNode res = new ObjectMapper().createObjectNode(); for (String clusterName : _currentStateCacheMap.keySet()) { // add the json node to the target state node targetStateNode.set(clusterName, getOrCreateCache(clusterName).serializeTargetAssignmentsToJSONNode()); } - targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis())); - return targetStateNode.toString(); + res.set(TARGET_STATE_ASSIGNMENT_KEY_NAME, targetStateNode); + res.set(TIMESTAMP_KEY, objectMapper.valueToTree(System.currentTimeMillis())); + return res.toString(); } public void updateTargetState(String clusterName, String instanceName, String resourceId, String shardId, @@ -149,6 +158,10 @@ public class GatewayServiceManager { return getOrCreateCache(clusterName).getTargetState(instanceName, resourceId, shardId); } + public Map<String, Map<String, Map<String, String>>> getAllTargetStates(String clusterName) { + return getOrCreateCache(clusterName).getAllTargetStates(); + } + /** * Update in memory shard state */ @@ -162,7 +175,7 @@ public class GatewayServiceManager { @Override public void run() { - System.out.println("Processing state transition result " + _event.getInstanceName()); + logger.info("Processing state transition result " + _event.getInstanceName()); HelixGatewayParticipant participant = getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); if (participant == null) { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index bbec7f3e4..8d785aac0 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -21,6 +21,7 @@ package org.apache.helix.gateway.util; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +60,14 @@ public class GatewayCurrentStateCache { return shardStateMap == null ? null : shardStateMap.getState(resource, shard); } + public synchronized Map<String, Map<String, Map<String, String>>> getAllTargetStates() { + Map<String, Map<String, Map<String, String>>> result = new HashMap<>(); + for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) { + result.put(entry.getKey(), new HashMap<>(entry.getValue()._stateMap)); + } + return result; + } + /** * Update the cached current state of instances in a cluster, and return the diff of the change. * @param userCurrentStateMap The new current state map of instances in the cluster @@ -141,8 +150,8 @@ public class GatewayCurrentStateCache { } public static class ShardStateMap { + // resource -> shard -> state Map<String, Map<String, String>> _stateMap; - ObjectNode root = mapper.createObjectNode(); public ShardStateMap(Map<String, Map<String, String>> stateMap) { _stateMap = new HashMap<>(stateMap); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java index 593bec258..68e8fb2aa 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -128,4 +128,20 @@ public class PollChannelUtil { return _lastUpdatedTime; } } + + /** + * Target assignments representation as JSON + */ + public static class TargetAssignment { + // cluster -> instance -> resource -> shard -> state + @JsonProperty ("Assignment") + String _targetAssignment; + @JsonProperty ("Timestamp") + long _timestamp; + + public TargetAssignment(String targetAssignment, long timestamp){ + _targetAssignment = targetAssignment; + _timestamp = timestamp; + } + } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java index 0097f55f4..fa4c948fb 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java @@ -231,7 +231,7 @@ public class TestFilePullChannelE2E extends HelixGatewayTestBase { int finalI = i; Assert.assertTrue(TestHelper.verify(() -> { String content = Files.readString(targetPaths.get(finalI)); - return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}"); + return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}"); }, TestHelper.WAIT_DURATION)); } } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java index 41604b427..c8f441f09 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -1,5 +1,7 @@ package org.apache.helix.gateway.service; +import java.util.HashMap; +import java.util.Map; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService; import org.testng.annotations.Test; @@ -7,6 +9,7 @@ import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import static org.testng.Assert.*; public class TestGatewayServiceManager { @@ -47,4 +50,30 @@ public class TestGatewayServiceManager { grpcService.stop(); } + @Test + public void testGetAllTargetStates() { + GatewayServiceManager gatewayServiceManager = new GatewayServiceManager("localhost:2181"); + String clusterName = "TestCluster"; + String instanceName = "instance1"; + String resourceId = "resource1"; + String shardId = "shard1"; + String state = "ONLINE"; + + // Add target state + gatewayServiceManager.updateTargetState(clusterName, instanceName, resourceId, shardId, state); + + // Expected target states + Map<String, Map<String, Map<String, String>>> expectedTargetStates = new HashMap<>(); + Map<String, Map<String, String>> instanceMap = new HashMap<>(); + Map<String, String> shardMap = new HashMap<>(); + shardMap.put(shardId, state); + instanceMap.put(resourceId, shardMap); + expectedTargetStates.put(instanceName, instanceMap); + + // Get all target states + Map<String, Map<String, Map<String, String>>> actualTargetStates = gatewayServiceManager.getAllTargetStates(clusterName); + + // Verify the target states + assertEquals(actualTargetStates, expectedTargetStates); + } }