This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new b8baca989 Change assignable instance set caches to only be replaced
after each update instead of reusing same maps and sets (#2746)
b8baca989 is described below
commit b8baca9897e4b64e0395cde3618a6d45b4891ebe
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Jan 30 15:56:45 2024 -0800
Change assignable instance set caches to only be replaced after each update
instead of reusing same maps and sets (#2746)
Build new DerivedInstanceCache object when updating instance maps and sets
from caches and atomically swap the value for _derivedInstanceCache in order to
stop having ConcurrentModificationErrors in async global and partial rebalance
stages.
---
.../dataproviders/BaseControllerDataProvider.java | 92 ++++++++++++++--------
1 file changed, 60 insertions(+), 32 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index c6c799dd1..1e40bbb72 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -119,13 +119,32 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
private final Map<String, Map<String, Set<String>>>
_disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
- // Assignable instances are instances will contain at most one instance with
a given logicalId.
- // This is used for SWAP related operations where there can be two instances
with the same logicalId.
- private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new
HashMap<>();
- private final Map<String, LiveInstance> _assignableLiveInstancesMap = new
HashMap<>();
- private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName =
new HashMap<>();
- private final Set<String> _liveSwapInInstanceNames = new HashSet<>();
- private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
+ private static final class DerivedInstanceCache {
+ // Assignable instances are instances will contain at most one instance
with a given logicalId.
+ // This is used for SWAP related operations where there can be two
instances with the same logicalId.
+ private final Map<String, InstanceConfig> _assignableInstanceConfigMap;
+ private final Map<String, LiveInstance> _assignableLiveInstancesMap;
+ private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName;
+ private final Set<String> _liveSwapInInstanceNames;
+ private final Set<String> _enabledSwapInInstanceNames;
+
+ DerivedInstanceCache(Map<String, InstanceConfig>
assignableInstanceConfigMap,
+ Map<String, LiveInstance> assignableLiveInstancesMap,
+ Map<String, String> swapOutInstanceNameToSwapInInstanceName,
+ Set<String> liveSwapInInstanceNames, Set<String>
enabledSwapInInstanceNames) {
+ _assignableInstanceConfigMap = assignableInstanceConfigMap;
+ _assignableLiveInstancesMap = assignableLiveInstancesMap;
+ _swapOutInstanceNameToSwapInInstanceName =
swapOutInstanceNameToSwapInInstanceName;
+ _liveSwapInInstanceNames = liveSwapInInstanceNames;
+ _enabledSwapInInstanceNames = enabledSwapInInstanceNames;
+ }
+ }
+
+ // All maps and sets are encapsulated in DerivedInstanceCache to ensure that
they are updated together
+ // as a snapshot.
+ private DerivedInstanceCache _derivedInstanceCache =
+ new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new
HashMap<>(), new HashSet<>(),
+ new HashSet<>());
private final Map<String, MonitoredAbnormalResolver>
_abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new
HashSet<>();
private Map<String, LiveInstance>
_allLiveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
@@ -363,12 +382,12 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
- // Clear all caches
- _assignableInstanceConfigMap.clear();
- _assignableLiveInstancesMap.clear();
- _swapOutInstanceNameToSwapInInstanceName.clear();
- _liveSwapInInstanceNames.clear();
- _enabledSwapInInstanceNames.clear();
+ // Create new caches to be populated.
+ Map<String, InstanceConfig> newAssignableInstanceConfigMap = new
HashMap<>();
+ Map<String, LiveInstance> newAssignableLiveInstancesMap = new HashMap<>();
+ Map<String, String> newSwapOutInstanceNameToSwapInInstanceName = new
HashMap<>();
+ Set<String> newLiveSwapInInstanceNames = new HashSet<>();
+ Set<String> newEnabledSwapInInstanceNames = new HashSet<>();
Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
@@ -403,14 +422,14 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
// instance with this instance. If this instance has no
InstanceOperation, then replace the filtered instance
// with this instance. This is the case where the SWAP_IN node has
been marked as complete or SWAP_IN exists and
// SWAP_OUT does not. There can never be a case where both have no
InstanceOperation set.
- _assignableInstanceConfigMap.remove(filteredNode);
- _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+ newAssignableInstanceConfigMap.remove(filteredNode);
+ newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
}
} else if (!currentInstanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
// EVACUATE instances are not considered to be assignable.
- _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+ newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
}
@@ -429,24 +448,30 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
}
liveInstancesMap.forEach((instanceName, liveInstance) -> {
- if (_assignableInstanceConfigMap.containsKey(instanceName)) {
- _assignableLiveInstancesMap.put(instanceName, liveInstance);
+ if (newAssignableInstanceConfigMap.containsKey(instanceName)) {
+ newAssignableLiveInstancesMap.put(instanceName, liveInstance);
}
});
swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
String swapInInstanceName = swapInInstancesByLogicalId.get(value);
if (swapInInstanceName != null) {
- _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName,
swapInInstanceName);
+ newSwapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName,
swapInInstanceName);
if (liveInstancesMap.containsKey(swapInInstanceName)) {
- _liveSwapInInstanceNames.add(swapInInstanceName);
+ newLiveSwapInInstanceNames.add(swapInInstanceName);
}
if
(InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
clusterConfig)) {
- _enabledSwapInInstanceNames.add(swapInInstanceName);
+ newEnabledSwapInInstanceNames.add(swapInInstanceName);
}
}
});
+
+ // Replace caches with up-to-date instance sets.
+ _derivedInstanceCache =
+ new DerivedInstanceCache(newAssignableInstanceConfigMap,
newAssignableLiveInstancesMap,
+ newSwapOutInstanceNameToSwapInInstanceName,
newLiveSwapInInstanceNames,
+ newEnabledSwapInInstanceNames);
}
private void refreshResourceConfig(final HelixDataAccessor accessor,
@@ -506,7 +531,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
.filter(e ->
!_timedOutInstanceDuringMaintenance.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
_assignableLiveInstanceExcludeTimedOutForMaintenance =
- _assignableLiveInstancesMap.entrySet().stream()
+ _derivedInstanceCache._assignableLiveInstancesMap.entrySet().stream()
.filter(e ->
!_timedOutInstanceDuringMaintenance.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
@@ -661,7 +686,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
return
Collections.unmodifiableMap(_assignableLiveInstanceExcludeTimedOutForMaintenance);
}
- return Collections.unmodifiableMap(_assignableLiveInstancesMap);
+ return
Collections.unmodifiableMap(_derivedInstanceCache._assignableLiveInstancesMap);
}
/**
@@ -685,7 +710,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
* @return A new set contains instance name
*/
public Set<String> getAssignableInstances() {
- return _assignableInstanceConfigMap.keySet();
+ return _derivedInstanceCache._assignableInstanceConfigMap.keySet();
}
/**
@@ -780,7 +805,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
*/
public Set<String> getAssignableInstancesWithTag(String instanceTag) {
Set<String> taggedInstances = new HashSet<>();
- for (String instance : _assignableInstanceConfigMap.keySet()) {
+ for (String instance :
_derivedInstanceCache._assignableInstanceConfigMap.keySet()) {
InstanceConfig instanceConfig =
_allInstanceConfigCache.getPropertyByName(instance);
if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
taggedInstances.add(instance);
@@ -796,7 +821,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
*/
public Set<String> getInstancesWithTag(String instanceTag) {
Set<String> taggedInstances = new HashSet<>();
- for (String instance : _assignableInstanceConfigMap.keySet()) {
+ for (String instance :
_derivedInstanceCache._assignableInstanceConfigMap.keySet()) {
InstanceConfig instanceConfig =
_allInstanceConfigCache.getPropertyByName(instance);
if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
taggedInstances.add(instance);
@@ -838,7 +863,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
* @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN
instanceNames.
*/
public Map<String, String> getSwapOutToSwapInInstancePairs() {
- return
Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
+ return Collections.unmodifiableMap(
+ _derivedInstanceCache._swapOutInstanceNameToSwapInInstanceName);
}
/**
@@ -847,7 +873,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT
instance.
*/
public Set<String> getLiveSwapInInstanceNames() {
- return Collections.unmodifiableSet(_liveSwapInInstanceNames);
+ return
Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
}
/**
@@ -856,7 +882,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT
instance.
*/
public Set<String> getEnabledSwapInInstanceNames() {
- return Collections.unmodifiableSet(_enabledSwapInInstanceNames);
+ return
Collections.unmodifiableSet(_derivedInstanceCache._enabledSwapInInstanceNames);
}
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
@@ -985,7 +1011,7 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
* @return a map of instance name to instance config
*/
public Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
- return Collections.unmodifiableMap(_assignableInstanceConfigMap);
+ return
Collections.unmodifiableMap(_derivedInstanceCache._assignableInstanceConfigMap);
}
/**
@@ -1288,13 +1314,15 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
StringBuilder sb = new StringBuilder();
sb.append(String.format("liveInstaceMap: %s",
_allLiveInstanceCache.getPropertyMap()))
.append("\n");
- sb.append(String.format("assignableLiveInstaceMap: %s",
_assignableLiveInstancesMap))
+ sb.append(String.format("assignableLiveInstaceMap: %s",
+ _derivedInstanceCache._assignableLiveInstancesMap))
.append("\n");
sb.append(String.format("idealStateMap: %s",
_idealStateCache.getPropertyMap())).append("\n");
sb.append(String.format("stateModelDefMap: %s",
_stateModelDefinitionCache.getPropertyMap())).append("\n");
sb.append(String.format("instanceConfigMap: %s",
_allInstanceConfigCache.getPropertyMap()))
.append("\n");
- sb.append(String.format("assignableInstanceConfigMap: %s",
_assignableInstanceConfigMap))
+ sb.append(String.format("assignableInstanceConfigMap: %s",
+ _derivedInstanceCache._assignableInstanceConfigMap))
.append("\n");
sb.append(String.format("resourceConfigMap: %s",
_resourceConfigCache.getPropertyMap())).append("\n");
sb.append(String.format("messageCache: %s",
_instanceMessagesCache)).append("\n");