This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 85d93a037 Gateway - Add GatewayCurrentStateCache for gateway service
(#2895)
85d93a037 is described below
commit 85d93a0370a4161e6794ca8e4c8cd0f70dc1903e
Author: xyuanlu <[email protected]>
AuthorDate: Wed Sep 4 13:31:19 2024 -0700
Gateway - Add GatewayCurrentStateCache for gateway service (#2895)
This pull request introduces a caching mechanism for the Helix Gateway
service. The main changes include:
Addition of a new GatewayCurrentStateCache class in
GatewayCurrentStateCache.java, which manages caching of current and target
states for instances in a cluster.
---
.../gateway/service/GatewayServiceManager.java | 7 +-
.../gateway/util/GatewayCurrentStateCache.java | 185 +++++++++++++++++++++
.../utils/TestGatewayCurrentStateCache.java | 82 +++++++++
3 files changed, 273 insertions(+), 1 deletion(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index cceabc887..3edd4ee9d 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -22,15 +22,18 @@ package org.apache.helix.gateway.service;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
@@ -60,6 +63,8 @@ public class GatewayServiceManager {
private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
+ private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;
+
public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig
gatewayServiceChannelConfig) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
@@ -68,6 +73,7 @@ public class GatewayServiceManager {
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); //
todo: make it configurable
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
+ _currentStateCacheMap = new HashMap<>();
}
/**
@@ -131,7 +137,6 @@ public class GatewayServiceManager {
}
}
-
public void startService() throws IOException {
_gatewayServiceChannel.start();
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
new file mode 100644
index 000000000..503909d4a
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -0,0 +1,185 @@
+package org.apache.helix.gateway.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A cache to store the current target assignment, and the reported current
state of the instances in a cluster.
+ */
+public class GatewayCurrentStateCache {
+ static ObjectMapper mapper = new ObjectMapper();
+ String _clusterName;
+
+ // A cache of current state. It should be updated by the
HelixGatewayServiceChannel
+ // instance -> resource state (resource -> shard -> target state)
+ Map<String, ShardStateMap> _currentStateMap;
+
+ // A cache of target state.
+ // instance -> resource state (resource -> shard -> target state)
+ Map<String, ShardStateMap> _targetStateMap;
+
+ public GatewayCurrentStateCache(String clusterName) {
+ _clusterName = clusterName;
+ _currentStateMap = new HashMap<>();
+ _targetStateMap = new HashMap<>();
+ }
+
+ public String getCurrentState(String instance, String resource, String
shard) {
+ return _currentStateMap.get(instance).getState(resource, shard);
+ }
+
+ public String getTargetState(String instance, String resource, String shard)
{
+ return _targetStateMap.get(instance).getState(resource, shard);
+ }
+
+ /**
+ * Update the cached current state of instances in a cluster, and return the
diff of the change.
+ * @param newCurrentStateMap The new current state map of instances in the
cluster
+ * @return
+ */
+ public Map<String, Map<String, Map<String, String>>>
updateCacheWithNewCurrentStateAndGetDiff(
+ Map<String, Map<String, Map<String, String>>> newCurrentStateMap) {
+ Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
+ for (String instance : newCurrentStateMap.keySet()) {
+ Map<String, Map<String, String>> newCurrentState =
newCurrentStateMap.get(instance);
+ diff.put(instance, _currentStateMap.computeIfAbsent(instance, k -> new
ShardStateMap(new HashMap<>()))
+ .updateAndGetDiff(newCurrentState));
+ }
+ return diff;
+ }
+
+ /**
+ * Update the cache with the current state diff.
+ * All existing target states remains the same
+ * @param currentStateDiff
+ */
+ public void updateCacheWithCurrentStateDiff(Map<String, Map<String,
Map<String, String>>> currentStateDiff) {
+ for (String instance : currentStateDiff.keySet()) {
+ Map<String, Map<String, String>> currentStateDiffMap =
currentStateDiff.get(instance);
+ updateShardStateMapWithDiff(_currentStateMap, instance,
currentStateDiffMap);
+ }
+ }
+
+ /**
+ * Update the target state with the changed target state maps.
+ * All existing target states remains the same
+ * @param targetStateChangeMap
+ */
+ public void updateTargetStateWithDiff(String instance, Map<String,
Map<String, String>> targetStateChangeMap) {
+ updateShardStateMapWithDiff(_targetStateMap, instance,
targetStateChangeMap);
+ }
+
+ /**
+ * Serialize the target state assignments to a JSON Node.
+ * example :
{"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
+ */
+ public ObjectNode serializeTargetAssignmentsToJSON() {
+ ObjectNode root = mapper.createObjectNode();
+ for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
+ root.set(entry.getKey(), entry.getValue().toJSONNode());
+ }
+ return root;
+ }
+
+ private void updateShardStateMapWithDiff(Map<String, ShardStateMap>
stateMap, String instance,
+ Map<String, Map<String, String>> diffMap) {
+ if (diffMap == null || diffMap.isEmpty()) {
+ return;
+ }
+ stateMap.computeIfAbsent(instance, k -> new ShardStateMap(new
HashMap<>())).updateWithDiff(diffMap);
+ }
+
+ public static class ShardStateMap {
+ Map<String, Map<String, String>> _stateMap;
+
+ public ShardStateMap(Map<String, Map<String, String>> stateMap) {
+ _stateMap = stateMap;
+ }
+
+ public String getState(String instance, String shard) {
+ return _stateMap.get(instance).get(shard);
+ }
+
+ private Map<String, Map<String, String>> getShardStateMap() {
+ return _stateMap;
+ }
+
+ private void updateWithDiff(Map<String, Map<String, String>> diffMap) {
+ for (Map.Entry<String, Map<String, String>> diffEntry :
diffMap.entrySet()) {
+ String resource = diffEntry.getKey();
+ Map<String, String> diffCurrentState = diffEntry.getValue();
+ if (_stateMap.get(resource) != null) {
+ _stateMap.get(resource).entrySet().forEach(currentMapEntry -> {
+ String shard = currentMapEntry.getKey();
+ if (diffCurrentState.get(shard) != null) {
+ currentMapEntry.setValue(diffCurrentState.get(shard));
+ }
+ });
+ } else {
+ _stateMap.put(resource, diffCurrentState);
+ }
+ }
+ }
+
+ private Map<String, Map<String, String>> updateAndGetDiff(Map<String,
Map<String, String>> newCurrentStateMap) {
+ Map<String, Map<String, String>> diff = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
newCurrentStateMap.entrySet()) {
+ String resource = entry.getKey();
+ Map<String, String> newCurrentState = entry.getValue();
+ Map<String, String> oldCurrentState = _stateMap.get(resource);
+ if (oldCurrentState == null) {
+ diff.put(resource, newCurrentState);
+ continue;
+ }
+ if (!oldCurrentState.equals(newCurrentState)) {
+ for (String shard : newCurrentState.keySet()) {
+ if (oldCurrentState.get(shard) == null ||
!oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
+ diff.computeIfAbsent(resource, k -> new HashMap<>()).put(shard,
newCurrentState.get(shard));
+ }
+ }
+ }
+ }
+ _stateMap = newCurrentStateMap;
+ return diff;
+ }
+
+ /**
+ * Serialize the shard state map to a JSON object.
+ * @return a JSON object representing the shard state map. Example:
{"shard1":"ONLINE","shard2":"OFFLINE"}
+ */
+ public ObjectNode toJSONNode() {
+ ObjectNode root = mapper.createObjectNode();
+ for (Map.Entry<String, Map<String, String>> entry :
_stateMap.entrySet()) {
+ String resource = entry.getKey();
+ ObjectNode resourceNode = mapper.createObjectNode();
+ for (Map.Entry<String, String> shardEntry :
entry.getValue().entrySet()) {
+ resourceNode.put(shardEntry.getKey(), shardEntry.getValue());
+ }
+ root.set(resource, resourceNode);
+ }
+ return root;
+ }
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
new file mode 100644
index 000000000..448e0b3e9
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestGatewayCurrentStateCache.java
@@ -0,0 +1,82 @@
+package org.apache.helix.gateway.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.gateway.util.GatewayCurrentStateCache;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestGatewayCurrentStateCache {
+ private GatewayCurrentStateCache cache;
+
+ @BeforeMethod
+ public void setUp() {
+ cache = new GatewayCurrentStateCache("TestCluster");
+ }
+
+ @Test
+ public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
+ Map<String, Map<String, Map<String, String>>> newState = new HashMap<>();
+ Map<String, Map<String, String>> instanceState = new HashMap<>();
+ Map<String, String> shardState = new HashMap<>();
+ shardState.put("shard1", "ONLINE");
+ instanceState.put("resource1", shardState);
+ newState.put("instance1", instanceState);
+
+ Map<String, Map<String, Map<String, String>>> diff =
cache.updateCacheWithNewCurrentStateAndGetDiff(newState);
+
+ Assert.assertNotNull(diff);
+ Assert.assertEquals(diff.size(), 1);
+ Assert.assertEquals(diff.get("instance1").get("resource1").get("shard1"),
"ONLINE");
+ }
+
+ @Test
+ public void testUpdateCacheWithCurrentStateDiff() {
+ Map<String, Map<String, Map<String, String>>> diff = new HashMap<>();
+ Map<String, Map<String, String>> instanceState = new HashMap<>();
+ Map<String, String> shardState = new HashMap<>();
+ shardState.put("shard2", "ONLINE");
+ shardState.put("shard1", "ONLINE");
+ instanceState.put("resource1", shardState);
+ diff.put("instance1", instanceState);
+
+ cache.updateCacheWithCurrentStateDiff(diff);
+
+ Assert.assertEquals(cache.getCurrentState("instance1", "resource1",
"shard1"), "ONLINE");
+ Assert.assertEquals(cache.getCurrentState("instance1", "resource1",
"shard2"), "ONLINE");
+ }
+
+ @Test
+ public void testUpdateTargetStateWithDiff() {
+ Map<String, Map<String, String>> targetStateChange = new HashMap<>();
+ Map<String, String> shardState = new HashMap<>();
+ shardState.put("shard1", "OFFLINE");
+ targetStateChange.put("resource1", shardState);
+
+ cache.updateTargetStateWithDiff("instance1", targetStateChange);
+
+ Assert.assertEquals(cache.getTargetState("instance1", "resource1",
"shard1"), "OFFLINE");
+ Assert.assertEquals(cache.serializeTargetAssignmentsToJSON().toString(),
"{\"instance1\":{\"resource1\":{\"shard1\":\"OFFLINE\"}}}");
+ }
+}