This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new f602ca21f Add getter for all target state - gateway service (#2943)
f602ca21f is described below
commit f602ca21f43004f7b4ebd4febfc20c7eca6bf0ee
Author: xyuanlu <[email protected]>
AuthorDate: Wed Oct 9 17:26:21 2024 -0700
Add getter for all target state - gateway service (#2943)
Add getter for all target state - gateway service
---
.../constant/gatewayServiceManagerConstant.java | 6 +++++
.../gateway/service/GatewayServiceManager.java | 19 +++++++++++---
.../gateway/util/GatewayCurrentStateCache.java | 11 +++++++-
.../apache/helix/gateway/util/PollChannelUtil.java | 16 ++++++++++++
.../integration/TestFilePullChannelE2E.java | 2 +-
.../gateway/service/TestGatewayServiceManager.java | 29 ++++++++++++++++++++++
6 files changed, 78 insertions(+), 5 deletions(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
new file mode 100644
index 000000000..c4fc420e1
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
@@ -0,0 +1,6 @@
+package org.apache.helix.gateway.api.constant;
+
+public class gatewayServiceManagerConstant {
+ public static final String TARGET_STATE_ASSIGNMENT_KEY_NAME = "Assignment";
+ public static final String TIMESTAMP_KEY = "Timestamp";
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 5e53833dc..0809ac6e2 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.service;
* under the License.
*/
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
@@ -36,6 +37,11 @@ import
org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
+import org.apache.helix.gateway.util.PollChannelUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.helix.gateway.api.constant.gatewayServiceManagerConstant.*;
/**
@@ -46,6 +52,7 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
* 4. For ST reply message, update the tracker
*/
public class GatewayServiceManager {
+ private static final Logger logger =
LoggerFactory.getLogger(GatewayServiceManager.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES =
@@ -128,12 +135,14 @@ public class GatewayServiceManager {
public synchronized String serializeTargetState() {
ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
+ ObjectNode res = new ObjectMapper().createObjectNode();
for (String clusterName : _currentStateCacheMap.keySet()) {
// add the json node to the target state node
targetStateNode.set(clusterName,
getOrCreateCache(clusterName).serializeTargetAssignmentsToJSONNode());
}
- targetStateNode.set("timestamp",
objectMapper.valueToTree(System.currentTimeMillis()));
- return targetStateNode.toString();
+ res.set(TARGET_STATE_ASSIGNMENT_KEY_NAME, targetStateNode);
+ res.set(TIMESTAMP_KEY,
objectMapper.valueToTree(System.currentTimeMillis()));
+ return res.toString();
}
public void updateTargetState(String clusterName, String instanceName,
String resourceId, String shardId,
@@ -149,6 +158,10 @@ public class GatewayServiceManager {
return getOrCreateCache(clusterName).getTargetState(instanceName,
resourceId, shardId);
}
+ public Map<String, Map<String, Map<String, String>>>
getAllTargetStates(String clusterName) {
+ return getOrCreateCache(clusterName).getAllTargetStates();
+ }
+
/**
* Update in memory shard state
*/
@@ -162,7 +175,7 @@ public class GatewayServiceManager {
@Override
public void run() {
- System.out.println("Processing state transition result " +
_event.getInstanceName());
+ logger.info("Processing state transition result " +
_event.getInstanceName());
HelixGatewayParticipant participant =
getHelixGatewayParticipant(_event.getClusterName(),
_event.getInstanceName());
if (participant == null) {
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
index bbec7f3e4..8d785aac0 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -21,6 +21,7 @@ package org.apache.helix.gateway.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +60,14 @@ public class GatewayCurrentStateCache {
return shardStateMap == null ? null : shardStateMap.getState(resource,
shard);
}
+ public synchronized Map<String, Map<String, Map<String, String>>>
getAllTargetStates() {
+ Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
+ for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
+ result.put(entry.getKey(), new HashMap<>(entry.getValue()._stateMap));
+ }
+ return result;
+ }
+
/**
* Update the cached current state of instances in a cluster, and return the
diff of the change.
* @param userCurrentStateMap The new current state map of instances in the
cluster
@@ -141,8 +150,8 @@ public class GatewayCurrentStateCache {
}
public static class ShardStateMap {
+ // resource -> shard -> state
Map<String, Map<String, String>> _stateMap;
- ObjectNode root = mapper.createObjectNode();
public ShardStateMap(Map<String, Map<String, String>> stateMap) {
_stateMap = new HashMap<>(stateMap);
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 593bec258..68e8fb2aa 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -128,4 +128,20 @@ public class PollChannelUtil {
return _lastUpdatedTime;
}
}
+
+ /**
+ * Target assignments representation as JSON
+ */
+ public static class TargetAssignment {
+ // cluster -> instance -> resource -> shard -> state
+ @JsonProperty ("Assignment")
+ String _targetAssignment;
+ @JsonProperty ("Timestamp")
+ long _timestamp;
+
+ public TargetAssignment(String targetAssignment, long timestamp){
+ _targetAssignment = targetAssignment;
+ _timestamp = timestamp;
+ }
+ }
}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
index 0097f55f4..fa4c948fb 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
@@ -231,7 +231,7 @@ public class TestFilePullChannelE2E extends
HelixGatewayTestBase {
int finalI = i;
Assert.assertTrue(TestHelper.verify(() -> {
String content = Files.readString(targetPaths.get(finalI));
- return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}");
+ return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}");
}, TestHelper.WAIT_DURATION));
}
}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 41604b427..c8f441f09 100644
---
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -1,5 +1,7 @@
package org.apache.helix.gateway.service;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
import org.testng.annotations.Test;
@@ -7,6 +9,7 @@ import
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
public class TestGatewayServiceManager {
@@ -47,4 +50,30 @@ public class TestGatewayServiceManager {
grpcService.stop();
}
+ @Test
+ public void testGetAllTargetStates() {
+ GatewayServiceManager gatewayServiceManager = new
GatewayServiceManager("localhost:2181");
+ String clusterName = "TestCluster";
+ String instanceName = "instance1";
+ String resourceId = "resource1";
+ String shardId = "shard1";
+ String state = "ONLINE";
+
+ // Add target state
+ gatewayServiceManager.updateTargetState(clusterName, instanceName,
resourceId, shardId, state);
+
+ // Expected target states
+ Map<String, Map<String, Map<String, String>>> expectedTargetStates = new
HashMap<>();
+ Map<String, Map<String, String>> instanceMap = new HashMap<>();
+ Map<String, String> shardMap = new HashMap<>();
+ shardMap.put(shardId, state);
+ instanceMap.put(resourceId, shardMap);
+ expectedTargetStates.put(instanceName, instanceMap);
+
+ // Get all target states
+ Map<String, Map<String, Map<String, String>>> actualTargetStates =
gatewayServiceManager.getAllTargetStates(clusterName);
+
+ // Verify the target states
+ assertEquals(actualTargetStates, expectedTargetStates);
+ }
}