This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-gateway-service by this 
push:
     new f602ca21f Add getter for all target state - gateway service (#2943)
f602ca21f is described below

commit f602ca21f43004f7b4ebd4febfc20c7eca6bf0ee
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Wed Oct 9 17:26:21 2024 -0700

    Add getter for all target state - gateway service (#2943)
    
    Add getter for all target state - gateway service
---
 .../constant/gatewayServiceManagerConstant.java    |  6 +++++
 .../gateway/service/GatewayServiceManager.java     | 19 +++++++++++---
 .../gateway/util/GatewayCurrentStateCache.java     | 11 +++++++-
 .../apache/helix/gateway/util/PollChannelUtil.java | 16 ++++++++++++
 .../integration/TestFilePullChannelE2E.java        |  2 +-
 .../gateway/service/TestGatewayServiceManager.java | 29 ++++++++++++++++++++++
 6 files changed, 78 insertions(+), 5 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
new file mode 100644
index 000000000..c4fc420e1
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/constant/gatewayServiceManagerConstant.java
@@ -0,0 +1,6 @@
+package org.apache.helix.gateway.api.constant;
+
+public class gatewayServiceManagerConstant {
+  public static final String TARGET_STATE_ASSIGNMENT_KEY_NAME = "Assignment";
+  public static final String TIMESTAMP_KEY = "Timestamp";
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 5e53833dc..0809ac6e2 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableSet;
@@ -36,6 +37,11 @@ import 
org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.util.GatewayCurrentStateCache;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
+import org.apache.helix.gateway.util.PollChannelUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.helix.gateway.api.constant.gatewayServiceManagerConstant.*;
 
 
 /**
@@ -46,6 +52,7 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
  *  4. For ST reply message, update the tracker
  */
 public class GatewayServiceManager {
+  private static final Logger logger = 
LoggerFactory.getLogger(GatewayServiceManager.class);
   private static final ObjectMapper objectMapper = new ObjectMapper();
   public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
   public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES =
@@ -128,12 +135,14 @@ public class GatewayServiceManager {
 
   public synchronized String serializeTargetState() {
     ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
+    ObjectNode res = new ObjectMapper().createObjectNode();
     for (String clusterName : _currentStateCacheMap.keySet()) {
       // add the json node to the target state node
       targetStateNode.set(clusterName, 
getOrCreateCache(clusterName).serializeTargetAssignmentsToJSONNode());
     }
-    targetStateNode.set("timestamp", 
objectMapper.valueToTree(System.currentTimeMillis()));
-    return targetStateNode.toString();
+    res.set(TARGET_STATE_ASSIGNMENT_KEY_NAME, targetStateNode);
+    res.set(TIMESTAMP_KEY, 
objectMapper.valueToTree(System.currentTimeMillis()));
+    return res.toString();
   }
 
   public void updateTargetState(String clusterName, String instanceName, 
String resourceId, String shardId,
@@ -149,6 +158,10 @@ public class GatewayServiceManager {
     return getOrCreateCache(clusterName).getTargetState(instanceName, 
resourceId, shardId);
   }
 
+  public Map<String, Map<String, Map<String, String>>> 
getAllTargetStates(String clusterName) {
+    return getOrCreateCache(clusterName).getAllTargetStates();
+  }
+
   /**
    * Update in memory shard state
    */
@@ -162,7 +175,7 @@ public class GatewayServiceManager {
 
     @Override
     public void run() {
-      System.out.println("Processing state transition result " + 
_event.getInstanceName());
+      logger.info("Processing state transition result " + 
_event.getInstanceName());
       HelixGatewayParticipant participant =
           getHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
       if (participant == null) {
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
index bbec7f3e4..8d785aac0 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -21,6 +21,7 @@ package org.apache.helix.gateway.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +60,14 @@ public class GatewayCurrentStateCache {
     return shardStateMap == null ? null : shardStateMap.getState(resource, 
shard);
   }
 
+  public synchronized Map<String, Map<String, Map<String, String>>> 
getAllTargetStates() {
+    Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
+    for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
+      result.put(entry.getKey(), new HashMap<>(entry.getValue()._stateMap));
+    }
+    return result;
+  }
+
   /**
    * Update the cached current state of instances in a cluster, and return the 
diff of the change.
    * @param userCurrentStateMap The new current state map of instances in the 
cluster
@@ -141,8 +150,8 @@ public class GatewayCurrentStateCache {
   }
 
   public static class ShardStateMap {
+    // resource -> shard -> state
     Map<String, Map<String, String>> _stateMap;
-    ObjectNode root = mapper.createObjectNode();
 
     public ShardStateMap(Map<String, Map<String, String>> stateMap) {
       _stateMap = new HashMap<>(stateMap);
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 593bec258..68e8fb2aa 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -128,4 +128,20 @@ public class PollChannelUtil {
       return _lastUpdatedTime;
     }
   }
+
+  /**
+   * Target assignments representation as JSON
+   */
+  public static class TargetAssignment {
+    // cluster -> instance -> resource -> shard -> state
+    @JsonProperty ("Assignment")
+    String _targetAssignment;
+    @JsonProperty ("Timestamp")
+    long _timestamp;
+
+    public TargetAssignment(String targetAssignment, long timestamp){
+      _targetAssignment = targetAssignment;
+      _timestamp = timestamp;
+    }
+  }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
index 0097f55f4..fa4c948fb 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
@@ -231,7 +231,7 @@ public class TestFilePullChannelE2E extends 
HelixGatewayTestBase {
       int finalI = i;
       Assert.assertTrue(TestHelper.verify(() -> {
         String content = Files.readString(targetPaths.get(finalI));
-        return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}");
+        return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}");
       }, TestHelper.WAIT_DURATION));
     }
   }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 41604b427..c8f441f09 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -1,5 +1,7 @@
 package org.apache.helix.gateway.service;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
 import org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService;
 import org.testng.annotations.Test;
@@ -7,6 +9,7 @@ import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
 
 
 public class TestGatewayServiceManager {
@@ -47,4 +50,30 @@ public class TestGatewayServiceManager {
 
     grpcService.stop();
   }
+  @Test
+  public void testGetAllTargetStates() {
+    GatewayServiceManager gatewayServiceManager = new 
GatewayServiceManager("localhost:2181");
+    String clusterName = "TestCluster";
+    String instanceName = "instance1";
+    String resourceId = "resource1";
+    String shardId = "shard1";
+    String state = "ONLINE";
+
+    // Add target state
+    gatewayServiceManager.updateTargetState(clusterName, instanceName, 
resourceId, shardId, state);
+
+    // Expected target states
+    Map<String, Map<String, Map<String, String>>> expectedTargetStates = new 
HashMap<>();
+    Map<String, Map<String, String>> instanceMap = new HashMap<>();
+    Map<String, String> shardMap = new HashMap<>();
+    shardMap.put(shardId, state);
+    instanceMap.put(resourceId, shardMap);
+    expectedTargetStates.put(instanceName, instanceMap);
+
+    // Get all target states
+    Map<String, Map<String, Map<String, String>>> actualTargetStates = 
gatewayServiceManager.getAllTargetStates(clusterName);
+
+    // Verify the target states
+    assertEquals(actualTargetStates, expectedTargetStates);
+  }
 }

Reply via email to