This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit bbb031a4a5c07023f4e7de23553853ccc5997d6d
Author: xyuanlu <[email protected]>
AuthorDate: Wed Sep 4 13:31:19 2024 -0700

    Gateway - Add GatewayCurrentStateCache for gateway service (#2895)
    
    This pull request introduces a caching mechanism for the Helix Gateway 
service. The main changes include:
    
    Addition of a new GatewayCurrentStateCache class in 
GatewayCurrentStateCache.java, which manages caching of current and target 
states for instances in a cluster.
---
 .../gateway/service/GatewayServiceManager.java     |   7 +-
 .../gateway/util/GatewayCurrentStateCache.java     | 185 +++++++++++++++++++++
 .../utils/TestGatewayCurrentStateCache.java        |  82 +++++++++
 3 files changed, 273 insertions(+), 1 deletion(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index cceabc887..3edd4ee9d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -22,15 +22,18 @@ package org.apache.helix.gateway.service;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
 import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.gateway.util.GatewayCurrentStateCache;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
 
@@ -60,6 +63,8 @@ public class GatewayServiceManager {
 
   private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
 
+  private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;
+
   public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
     _helixGatewayParticipantMap = new ConcurrentHashMap<>();
     _zkAddress = zkAddress;
@@ -68,6 +73,7 @@ public class GatewayServiceManager {
     _connectionEventProcessor =
         new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
     _gatewayServiceChannelConfig = gatewayServiceChannelConfig;
+    _currentStateCacheMap = new HashMap<>();
   }
 
   /**
@@ -131,7 +137,6 @@ public class GatewayServiceManager {
     }
   }
 
-
   public void startService() throws IOException {
     _gatewayServiceChannel.start();
   }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
new file mode 100644
index 000000000..503909d4a
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -0,0 +1,185 @@
+package org.apache.helix.gateway.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A cache to store the current target assignment, and the reported current 
state of the instances in a cluster.
+ */
+public class GatewayCurrentStateCache {
+  static ObjectMapper mapper = new ObjectMapper();
+  String _clusterName;
+
+  // A cache of current state. It should be updated by the 
HelixGatewayServiceChannel
+  // instance -> resource state (resource -> shard -> target state)
+  Map<String, ShardStateMap> _currentStateMap;
+
+  // A cache of target state.
+  // instance -> resource state (resource -> shard -> target state)
+  Map<String, ShardStateMap> _targetStateMap;
+
+  public GatewayCurrentStateCache(String clusterName) {
+    _clusterName = clusterName;
+    _currentStateMap = new HashMap<>();
+    _targetStateMap = new HashMap<>();
+  }
+
+  public String getCurrentState(String instance, String resource, String 
shard) {
+    return _currentStateMap.get(instance).getState(resource, shard);
+  }
+
+  public String getTargetState(String instance, String resource, String shard) 
{
+    return _targetStateMap.get(instance).getState(resource, shard);
+  }
+
+  /**
+   * Update the cached current state of instances in a cluster, and return the 
diff of the change.
+   * @param newCurrentStateMap The new current state map of instances in the 
cluster
+   * @return
+   */
+  public Map<String, Map<String, Map<String, String>>> 
updateCacheWithNewCurrentStateAndGetDiff(
+      Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
+    Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
+    for (String instance : newCurrentStateMap.keySet()) {
+      Map<String, Map<String, String>> newCurrentState = 
newCurrentStateMap.get(instance);
+      diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new 
ShardStateMap(new HashMap<>()))
+          .updateAndGetDiff(newCurrentState));
+    }
+    return diff;
+  }
+
+  /**
+   * Update the cache with the current state diff.
+   * All existing target states remains the same
+   * @param currentStateDiff
+   */
+  public void updateCacheWithCurrentStateDiff(Map<String, Map<String, 
Map<String, String>>> currentStateDiff) {
+    for (String instance : currentStateDiff.keySet()) {
+      Map<String, Map<String, String>> currentStateDiffMap = 
currentStateDiff.get(instance);
+      updateShardStateMapWithDiff(_currentStateMap, instance, 
currentStateDiffMap);
+    }
+  }
+
+  /**
+   * Update the target state with the changed target state maps.
+   * All existing target states remains the same
+   * @param targetStateChangeMap
+   */
+  public void updateTargetStateWithDiff(String instance, Map<String, 
Map<String, String>> targetStateChangeMap) {
+    updateShardStateMapWithDiff(_targetStateMap, instance, 
targetStateChangeMap);
+  }
+
+  /**
+   * Serialize the target state assignments to a JSON Node.
+   * example : 
{"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
+   */
+  public ObjectNode serializeTargetAssignmentsToJSON() {
+    ObjectNode root = mapper.createObjectNode();
+    for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
+      root.set(entry.getKey(), entry.getValue().toJSONNode());
+    }
+    return root;
+  }
+
+  private void updateShardStateMapWithDiff(Map<String, ShardStateMap> 
stateMap, String instance,
+      Map<String, Map<String, String>> diffMap) {
+    if (diffMap == null || diffMap.isEmpty()) {
+      return;
+    }
+    stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new 
HashMap<>())).updateWithDiff(diffMap);
+  }
+
+  public static class ShardStateMap {
+    Map<String, Map<String, String>> _stateMap;
+
+    public ShardStateMap(Map<String, Map<String, String>> stateMap) {
+      _stateMap = stateMap;
+    }
+
+    public String getState(String instance, String shard) {
+      return _stateMap.get(instance).get(shard);
+    }
+
+    private Map<String, Map<String, String>> getShardStateMap() {
+      return _stateMap;
+    }
+
+    private void updateWithDiff(Map<String, Map<String, String>> diffMap) {
+      for (Map.Entry<String, Map<String, String>> diffEntry : 
diffMap.entrySet()) {
+        String resource = diffEntry.getKey();
+        Map<String, String> diffCurrentState = diffEntry.getValue();
+        if (_stateMap.get(resource) != null) {
+          _stateMap.get(resource).entrySet().forEach(currentMapEntry -> {
+            String shard = currentMapEntry.getKey();
+            if (diffCurrentState.get(shard) != null) {
+              currentMapEntry.setValue(diffCurrentState.get(shard));
+            }
+          });
+        } else {
+          _stateMap.put(resource, diffCurrentState);
+        }
+      }
+    }
+
+    private Map<String, Map<String, String>> updateAndGetDiff(Map<String, 
Map<String, String>> newCurrentStateMap) {
+      Map<String, Map<String, String>> diff = new HashMap<>();
+      for (Map.Entry<String, Map<String, String>> entry : 
newCurrentStateMap.entrySet()) {
+        String resource = entry.getKey();
+        Map<String, String> newCurrentState = entry.getValue();
+        Map<String, String> oldCurrentState = _stateMap.get(resource);
+        if (oldCurrentState == null) {
+          diff.put(resource, newCurrentState);
+          continue;
+        }
+        if (!oldCurrentState.equals(newCurrentState)) {
+          for (String shard : newCurrentState.keySet()) {
+            if (oldCurrentState.get(shard) == null || 
!oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
+              diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard, 
newCurrentState.get(shard));
+            }
+          }
+        }
+      }
+      _stateMap = newCurrentStateMap;
+      return diff;
+    }
+
+    /**
+     * Serialize the shard state map to a JSON object.
+     * @return a JSON object representing the shard state map. Example: 
{"shard1":"ONLINE","shard2":"OFFLINE"}
+     */
+    public ObjectNode toJSONNode() {
+      ObjectNode root = mapper.createObjectNode();
+      for (Map.Entry<String, Map<String, String>> entry : 
_stateMap.entrySet()) {
+        String resource = entry.getKey();
+        ObjectNode resourceNode = mapper.createObjectNode();
+        for (Map.Entry<String, String> shardEntry : 
entry.getValue().entrySet()) {
+          resourceNode.put(shardEntry.getKey(), shardEntry.getValue());
+        }
+        root.set(resource, resourceNode);
+      }
+      return root;
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
new file mode 100644
index 000000000..448e0b3e9
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
@@ -0,0 +1,82 @@
+package org.apache.helix.gateway.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.gateway.util.GatewayCurrentStateCache;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestGatewayCurrentStateCache {
+  private GatewayCurrentStateCache cache;
+
+  @BeforeMethod
+  public void setUp() {
+    cache = new GatewayCurrentStateCache("TestCluster");
+  }
+
+  @Test
+  public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
+    Map<String, Map<String, Map<String, String>>> newState = new HashMap<>();
+    Map<String, Map<String, String>> instanceState = new HashMap<>();
+    Map<String, String> shardState = new HashMap<>();
+    shardState.put("shard1", "ONLINE");
+    instanceState.put("resource1", shardState);
+    newState.put("instance1", instanceState);
+
+    Map<String, Map<String, Map<String, String>>> diff = 
cache.updateCacheWithNewCurrentStateAndGetDiff(newState);
+
+    Assert.assertNotNull(diff);
+    Assert.assertEquals(diff.size(), 1);
+    Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"), 
"ONLINE");
+  }
+
+  @Test
+  public void testUpdateCacheWithCurrentStateDiff() {
+    Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
+    Map<String, Map<String, String>> instanceState = new HashMap<>();
+    Map<String, String> shardState = new HashMap<>();
+    shardState.put("shard2", "ONLINE");
+    shardState.put("shard1", "ONLINE");
+    instanceState.put("resource1", shardState);
+    diff.put("instance1", instanceState);
+
+    cache.updateCacheWithCurrentStateDiff(diff);
+
+    Assert.assertEquals(cache.getCurrentState("instance1", "resource1", 
"shard1"), "ONLINE");
+    Assert.assertEquals(cache.getCurrentState("instance1", "resource1", 
"shard2"), "ONLINE");
+  }
+
+  @Test
+  public void testUpdateTargetStateWithDiff() {
+    Map<String, Map<String, String>> targetStateChange = new HashMap<>();
+    Map<String, String> shardState = new HashMap<>();
+    shardState.put("shard1", "OFFLINE");
+    targetStateChange.put("resource1", shardState);
+
+    cache.updateTargetStateWithDiff("instance1", targetStateChange);
+
+    Assert.assertEquals(cache.getTargetState("instance1", "resource1", 
"shard1"), "OFFLINE");
+    Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(), 
"{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
+  }
+}

Reply via email to