Change RoutingDataCache to use zk version based selective update when reading the ExternalViews and TargetExternalView.
RB=1317410 BUG=HELIX-929 G=helix-reviewers A=jxue Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d02083e6 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d02083e6 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d02083e6 Branch: refs/heads/master Commit: d02083e652797673e4826a45fc19896f610803c2 Parents: 8527a5a Author: Lei Xia <[email protected]> Authored: Fri May 11 12:11:09 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Wed Jul 11 15:28:06 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/GroupCommit.java | 2 + .../helix/common/caches/AbstractDataCache.java | 83 ++++++++++ .../common/caches/BasicClusterDataCache.java | 67 +------- .../helix/common/caches/CurrentStateCache.java | 6 +- .../helix/common/caches/ExternalViewCache.java | 151 +++++++++++++++++++ .../helix/common/caches/IdealStateCache.java | 127 ++++++++++++++++ .../common/caches/InstanceMessagesCache.java | 1 - .../common/caches/TargetExternalViewCache.java | 30 ++++ .../controller/stages/ClusterDataCache.java | 70 ++------- .../helix/spectator/RoutingDataCache.java | 21 ++- .../org/apache/helix/DummyProcessThread.java | 1 - .../java/org/apache/helix/TestRoutingTable.java | 3 +- .../TestClusterDataCacheSelectiveUpdate.java | 80 +--------- .../java/org/apache/helix/mock/MockManager.java | 2 +- .../helix/mock/MockZkHelixDataAccessor.java | 80 ++++++++++ .../helix/spectator/TestRoutingDataCache.java | 123 +++++++++++++++ 16 files changed, 633 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/GroupCommit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java index b6c0ff1..8165f93 100644 --- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java +++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java @@ -138,6 +138,8 @@ public class GroupCommit { success = accessor.remove(mergedKey, options); if (!success) { LOG.error("Fails to remove " + mergedKey + " from ZK, retry it!"); + } else { + LOG.info("Removed " + mergedKey); } } else { success = accessor.set(mergedKey, merged, options); http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java new file mode 100644 index 0000000..56b4aa4 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java @@ -0,0 +1,83 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDataCache { + private static Logger LOG = LoggerFactory.getLogger(AbstractDataCache.class.getName()); + + + /** + * Selectively fetch Helix Properties from ZK by comparing the version of local cached one with the one on ZK. + * If version on ZK is newer, fetch it from zk and update local cache. + * @param accessor the HelixDataAccessor + * @param reloadKeys keys needs to be reload + * @param cachedKeys keys already exists in the cache + * @param cachedPropertyMap cached map of propertykey -> property object + * @param <T> the type of metadata + * @return updated properties map + */ + protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties( + HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey> cachedKeys, + Map<PropertyKey, T> cachedPropertyMap) { + // All new entries from zk not cached locally yet should be read from ZK. + Map<PropertyKey, T> refreshedPropertyMap = Maps.newHashMap(); + List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys); + for (int i = 0; i < cachedKeys.size(); i++) { + PropertyKey key = cachedKeys.get(i); + HelixProperty.Stat stat = stats.get(i); + if (stat != null) { + T property = cachedPropertyMap.get(key); + if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { + refreshedPropertyMap.put(key, property); + } else { + // need update from zk + reloadKeys.add(key); + } + } else { + LOG.warn("stat is null for key: " + key); + reloadKeys.add(key); + } + } + + List<T> reloadedProperty = accessor.getProperty(reloadKeys, true); + Iterator<PropertyKey> csKeyIter = reloadKeys.iterator(); + for (T property : reloadedProperty) { + PropertyKey key = csKeyIter.next(); + if (property != null) { + refreshedPropertyMap.put(key, property); + } else { + LOG.warn("znode is null for key: " + key); + } + } + + return refreshedPropertyMap; + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java index 06fcaf6..d3a3d20 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java @@ -19,16 +19,12 @@ package org.apache.helix.common.caches; * under the License. */ -import com.google.common.collect.Maps; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; @@ -44,7 +40,7 @@ public class BasicClusterDataCache { protected Map<String, LiveInstance> _liveInstanceMap; protected Map<String, InstanceConfig> _instanceConfigMap; - protected Map<String, ExternalView> _externalViewMap; + protected ExternalViewCache _externalViewCache; protected String _clusterName; @@ -54,7 +50,7 @@ public class BasicClusterDataCache { _propertyDataChangedMap = new ConcurrentHashMap<>(); _liveInstanceMap = new HashMap<>(); _instanceConfigMap = new HashMap<>(); - _externalViewMap = new HashMap<>(); + _externalViewCache = new ExternalViewCache(clusterName); _clusterName = clusterName; requireFullRefresh(); } @@ -72,11 +68,8 @@ public class BasicClusterDataCache { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) { - long start = System.currentTimeMillis(); _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false)); - _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews(), true); - LOG.info("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + ( - System.currentTimeMillis() - start) + " ms"); + _externalViewCache.refresh(accessor); } if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) { @@ -103,64 +96,18 @@ public class BasicClusterDataCache { if (LOG.isDebugEnabled()) { LOG.debug("LiveInstances: " + _liveInstanceMap); - LOG.debug("ExternalViews: " + _externalViewMap); + LOG.debug("ExternalViews: " + _externalViewCache.getExternalViewMap().keySet()); LOG.debug("InstanceConfigs: " + _instanceConfigMap); } } /** - * Selective update Helix Cache by version - * @param accessor the HelixDataAccessor - * @param reloadKeys keys needs to be reload - * @param cachedKeys keys already exists in the cache - * @param cachedPropertyMap cached map of propertykey -> property object - * @param <T> the type of metadata - * @return - */ - public static <T extends HelixProperty> Map<PropertyKey, T> updateReloadProperties( - HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey> cachedKeys, - Map<PropertyKey, T> cachedPropertyMap) { - // All new entries from zk not cached locally yet should be read from ZK. - Map<PropertyKey, T> refreshedPropertyMap = Maps.newHashMap(); - List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys); - for (int i = 0; i < cachedKeys.size(); i++) { - PropertyKey key = cachedKeys.get(i); - HelixProperty.Stat stat = stats.get(i); - if (stat != null) { - T property = cachedPropertyMap.get(key); - if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { - refreshedPropertyMap.put(key, property); - } else { - // need update from zk - reloadKeys.add(key); - } - } else { - LOG.warn("Stat is null for key: " + key); - reloadKeys.add(key); - } - } - - List<T> reloadedProperty = accessor.getProperty(reloadKeys, true); - Iterator<PropertyKey> reloadKeyIter = reloadKeys.iterator(); - for (T property : reloadedProperty) { - PropertyKey key = reloadKeyIter.next(); - if (property != null) { - refreshedPropertyMap.put(key, property); - } else { - LOG.warn("Reload property is null for key: " + key); - } - } - - return refreshedPropertyMap; - } - - /** * Retrieves the ExternalView for all resources * * @return */ public Map<String, ExternalView> getExternalViews() { - return Collections.unmodifiableMap(_externalViewMap); + return _externalViewCache.getExternalViewMap(); } /** @@ -213,7 +160,7 @@ public class BasicClusterDataCache { _instanceConfigMap.clear(); break; case EXTERNAL_VIEW: - _externalViewMap.clear(); + _externalViewCache.clear(); break; default: break; @@ -236,7 +183,7 @@ public class BasicClusterDataCache { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); - sb.append("externalViewMap:" + _externalViewMap).append("\n"); + sb.append("externalViewMap:" + _externalViewCache.getExternalViewMap()).append("\n"); sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); return sb.toString(); http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java index d28a859..35a7179 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; /** * Cache to hold all CurrentStates of a cluster. */ -public class CurrentStateCache { +public class CurrentStateCache extends AbstractDataCache { private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName()); private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap; @@ -121,8 +121,8 @@ public class CurrentStateCache { Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet()); cachedKeys.retainAll(currentStateKeys); - _currentStateCache = Collections.unmodifiableMap(BasicClusterDataCache - .updateReloadProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys), + _currentStateCache = Collections.unmodifiableMap( + refreshProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys), _currentStateCache)); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java new file mode 100644 index 0000000..94c2b16 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java @@ -0,0 +1,151 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.model.ExternalView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache to hold all ExternalViews of a cluster. + */ +public class ExternalViewCache extends AbstractDataCache { + private static final Logger LOG = LoggerFactory.getLogger(ExternalViewCache.class.getName()); + + protected Map<String, ExternalView> _externalViewMap; + protected Map<String, ExternalView> _externalViewCache; + + protected String _clusterName; + + private PropertyType _type; + + public ExternalViewCache(String clusterName) { + this(clusterName, PropertyType.EXTERNALVIEW); + } + + protected ExternalViewCache(String clusterName, PropertyType type) { + _clusterName = clusterName; + _externalViewMap = Collections.emptyMap(); + _externalViewCache = Collections.emptyMap(); + _type = type; + } + + + /** + * This refreshes the ExternalView data by re-fetching the data from zookeeper in an efficient + * way + * + * @param accessor + * + * @return + */ + public void refresh(HelixDataAccessor accessor) { + long startTime = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Set<PropertyKey> currentPropertyKeys = new HashSet<>(); + + List<String> resources = accessor.getChildNames(externalViewsKey(keyBuilder)); + for (String resource : resources) { + currentPropertyKeys.add(externalViewKey(keyBuilder, resource)); + } + + Set<PropertyKey> cachedKeys = new HashSet<>(); + Map<PropertyKey, ExternalView> cachedExternalViewMap = Maps.newHashMap(); + for (String resource : _externalViewCache.keySet()) { + PropertyKey key = externalViewKey(keyBuilder, resource); + cachedKeys.add(key); + cachedExternalViewMap.put(key, _externalViewCache.get(resource)); + } + cachedKeys.retainAll(currentPropertyKeys); + + Set<PropertyKey> reloadKeys = new HashSet<>(currentPropertyKeys); + reloadKeys.removeAll(cachedKeys); + + Map<PropertyKey, ExternalView> updatedMap = + refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), + cachedExternalViewMap); + Map<String, ExternalView> newExternalViewMap = Maps.newHashMap(); + for (ExternalView externalView : updatedMap.values()) { + newExternalViewMap.put(externalView.getResourceName(), externalView); + } + + _externalViewCache = new HashMap<>(newExternalViewMap); + _externalViewMap = new HashMap<>(newExternalViewMap); + + long endTime = System.currentTimeMillis(); + LOG.info("Refresh " + _externalViewMap.size() + " ExternalViews for cluster " + _clusterName + + ", took " + (endTime - startTime) + " ms"); + } + + private PropertyKey externalViewsKey(PropertyKey.Builder keyBuilder) { + PropertyKey evPropertyKey; + if (_type.equals(PropertyType.EXTERNALVIEW)) { + evPropertyKey = keyBuilder.externalViews(); + } else if (_type.equals(PropertyType.TARGETEXTERNALVIEW)) { + evPropertyKey = keyBuilder.targetExternalViews(); + } else { + throw new HelixException( + "Failed to refresh ExternalViewCache, Wrong property type " + _type + "!"); + } + + return evPropertyKey; + } + + private PropertyKey externalViewKey(PropertyKey.Builder keyBuilder, String resource) { + PropertyKey evPropertyKey; + if (_type.equals(PropertyType.EXTERNALVIEW)) { + evPropertyKey = keyBuilder.externalView(resource); + } else if (_type.equals(PropertyType.TARGETEXTERNALVIEW)) { + evPropertyKey = keyBuilder.targetExternalView(resource); + } else { + throw new HelixException( + "Failed to refresh ExternalViewCache, Wrong property type " + _type + "!"); + } + + return evPropertyKey; + } + + /** + * Return ExternalView map for all resources. + * + * @return + */ + public Map<String, ExternalView> getExternalViewMap() { + return Collections.unmodifiableMap(_externalViewMap); + } + + public void clear() { + _externalViewCache.clear(); + _externalViewMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java new file mode 100644 index 0000000..12056e4 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java @@ -0,0 +1,127 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache to hold all IdealStates of a cluster. + */ +public class IdealStateCache extends AbstractDataCache { + private static final Logger LOG = LoggerFactory.getLogger(IdealStateCache.class.getName()); + + private Map<String, IdealState> _idealStateMap; + private Map<String, IdealState> _idealStateCache; + + private String _clusterName; + + public IdealStateCache(String clusterName) { + _clusterName = clusterName; + _idealStateMap = Collections.emptyMap(); + _idealStateCache = Collections.emptyMap(); + } + + + /** + * This refreshes the IdealState data by re-fetching the data from zookeeper in an efficient + * way + * + * @param accessor + * + * @return + */ + public void refresh(HelixDataAccessor accessor) { + long startTime = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Set<PropertyKey> currentIdealStateKeys = new HashSet<>(); + for (String idealState : accessor.getChildNames(keyBuilder.idealStates())) { + currentIdealStateKeys.add(keyBuilder.idealStates(idealState)); + } + + Set<PropertyKey> cachedKeys = new HashSet<>(); + Map<PropertyKey, IdealState> cachedIdealStateMap = Maps.newHashMap(); + for (String idealState : _idealStateCache.keySet()) { + cachedKeys.add(keyBuilder.idealStates(idealState)); + cachedIdealStateMap + .put(keyBuilder.idealStates(idealState), _idealStateCache.get(idealState)); + } + cachedKeys.retainAll(currentIdealStateKeys); + + Set<PropertyKey> reloadKeys = new HashSet<>(currentIdealStateKeys); + reloadKeys.removeAll(cachedKeys); + + Map<PropertyKey, IdealState> updatedMap = + refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), + cachedIdealStateMap); + Map<String, IdealState> newIdealStateMap = Maps.newHashMap(); + for (IdealState idealState : updatedMap.values()) { + newIdealStateMap.put(idealState.getResourceName(), idealState); + } + + _idealStateCache = new HashMap<>(newIdealStateMap); + _idealStateMap = new HashMap<>(newIdealStateMap); + + long endTime = System.currentTimeMillis(); + LOG.info( + "Refresh " + _idealStateMap.size() + " idealStates for cluster " + _clusterName + ", took " + + (endTime - startTime) + " ms"); + } + + /** + * Return IdealState map for all resources. + * + * @return + */ + public Map<String, IdealState> getIdealStateMap() { + return Collections.unmodifiableMap(_idealStateMap); + } + + public void clear() { + _idealStateMap.clear(); + _idealStateCache.clear(); + } + + /** + * Set the IdealStates. + * CAUTION: After refresh() call, the previously set idealstates will be updated. + * + * @param idealStates + */ + public void setIdealStates(List<IdealState> idealStates) { + Map<String, IdealState> idealStateMap = new HashMap(); + for (IdealState idealState : idealStates) { + idealStateMap.put(idealState.getId(), idealState); + } + _idealStateMap = idealStateMap; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java index 1929776..f8001da 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java new file mode 100644 index 0000000..fdff9b7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java @@ -0,0 +1,30 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.helix.PropertyType; + +/** + * Cache to hold all TargetExternalViews of a cluster. + */ +public class TargetExternalViewCache extends ExternalViewCache { + public TargetExternalViewCache(String clusterName) { + super(clusterName, PropertyType.TARGETEXTERNALVIEW); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 4354818..f3f9e0b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -19,12 +19,10 @@ package org.apache.helix.controller.stages; * under the License. */ -import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,8 +32,8 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; -import org.apache.helix.common.caches.BasicClusterDataCache; import org.apache.helix.common.caches.CurrentStateCache; +import org.apache.helix.common.caches.IdealStateCache; import org.apache.helix.common.caches.InstanceMessagesCache; import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.model.ClusterConfig; @@ -73,8 +71,6 @@ public class ClusterDataCache { private ClusterConfig _clusterConfig; private Map<String, LiveInstance> _liveInstanceMap; private Map<String, LiveInstance> _liveInstanceCacheMap; - private Map<String, IdealState> _idealStateMap; - private Map<String, IdealState> _idealStateCacheMap = Maps.newHashMap(); private Map<String, StateModelDefinition> _stateModelDefMap; private Map<String, InstanceConfig> _instanceConfigMap; private Map<String, InstanceConfig> _instanceConfigCacheMap; @@ -90,6 +86,7 @@ public class ClusterDataCache { private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>(); private Set<String> _disabledInstanceSet = new HashSet<>(); + private IdealStateCache _idealStateCache; private CurrentStateCache _currentStateCache; private TaskDataCache _taskDataCache; private InstanceMessagesCache _instanceMessagesCache; @@ -125,6 +122,7 @@ public class ClusterDataCache { _propertyDataChangedMap.put(type, true); } _clusterName = clusterName; + _idealStateCache = new IdealStateCache(_clusterName); _currentStateCache = new CurrentStateCache(_clusterName); _taskDataCache = new TaskDataCache(_clusterName); _instanceMessagesCache = new InstanceMessagesCache(_clusterName); @@ -143,7 +141,7 @@ public class ClusterDataCache { if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) { _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, false); clearCachedResourceAssignments(); - _idealStateCacheMap = refreshIdealStates(accessor); + _idealStateCache.refresh(accessor); } if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) { @@ -175,8 +173,8 @@ public class ClusterDataCache { } } - _idealStateMap = new HashMap<>(_idealStateCacheMap); _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap); + _liveInstanceMap = new HashMap(_liveInstanceCacheMap); _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap); _resourceConfigMap = new HashMap<>(_resourceConfigCacheMap); @@ -228,7 +226,7 @@ public class ClusterDataCache { for (LiveInstance instance : _liveInstanceMap.values()) { LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); } - LOG.debug("IdealStates: " + _idealStateMap.keySet()); + LOG.debug("IdealStates: " + _idealStateCache.getIdealStateMap().keySet()); LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet()); LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); LOG.debug("ClusterConfigs: " + _clusterConfig); @@ -319,15 +317,11 @@ public class ClusterDataCache { * @return */ public Map<String, IdealState> getIdealStates() { - return _idealStateMap; + return _idealStateCache.getIdealStateMap(); } public synchronized void setIdealStates(List<IdealState> idealStates) { - Map<String, IdealState> idealStateMap = new HashMap<>(); - for (IdealState idealState : idealStates) { - idealStateMap.put(idealState.getId(), idealState); - } - _idealStateCacheMap = idealStateMap; + _idealStateCache.setIdealStates(idealStates); } public Map<String, Map<String, String>> getIdealStateRules() { @@ -469,7 +463,7 @@ public class ClusterDataCache { * @return */ public IdealState getIdealState(String resourceName) { - return _idealStateMap.get(resourceName); + return _idealStateCache.getIdealStateMap().get(resourceName); } /** @@ -596,9 +590,10 @@ public class ClusterDataCache { */ public int getReplicas(String resourceName) { int replicas = -1; + Map<String, IdealState> idealStateMap = _idealStateCache.getIdealStateMap(); - if (_idealStateMap.containsKey(resourceName)) { - String replicasStr = _idealStateMap.get(resourceName).getReplicas(); + if (idealStateMap.containsKey(resourceName)) { + String replicasStr = idealStateMap.get(resourceName).getReplicas(); if (replicasStr != null) { if (replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString())) { @@ -672,43 +667,6 @@ public class ClusterDataCache { } } - private Map<String, IdealState> refreshIdealStates(HelixDataAccessor accessor) { - long startTime = System.currentTimeMillis(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Set<PropertyKey> currentIdealStateKeys = new HashSet<>(); - for (String idealState : accessor.getChildNames(keyBuilder.idealStates())) { - currentIdealStateKeys.add(keyBuilder.idealStates(idealState)); - } - - Set<PropertyKey> cachedKeys = new HashSet<>(); - Map<PropertyKey, IdealState> cachedIdealStateMap = Maps.newHashMap(); - for (String idealState : _idealStateCacheMap.keySet()) { - cachedKeys.add(keyBuilder.idealStates(idealState)); - cachedIdealStateMap - .put(keyBuilder.idealStates(idealState), _idealStateCacheMap.get(idealState)); - } - cachedKeys.retainAll(currentIdealStateKeys); - - Set<PropertyKey> reloadKeys = new HashSet<>(currentIdealStateKeys); - reloadKeys.removeAll(cachedKeys); - - Map<PropertyKey, IdealState> updatedMap = BasicClusterDataCache - .updateReloadProperties(accessor, new LinkedList<>(reloadKeys), - new ArrayList<>(cachedKeys), cachedIdealStateMap); - Map<String, IdealState> newIdealStateMap = Maps.newHashMap(); - for (IdealState idealState : updatedMap.values()) { - newIdealStateMap.put(idealState.getResourceName(), idealState); - } - - long endTime = System.currentTimeMillis(); - LOG.info("Refresh idealStates for cluster " + _clusterName + ", took " + (endTime - - startTime) + " ms"); - - return Collections.unmodifiableMap(newIdealStateMap); - } - - - /** * Return the JobContext by resource name * @param resourceName @@ -917,7 +875,7 @@ public class ClusterDataCache { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); - sb.append("idealStateMap:" + _idealStateMap).append("\n"); + sb.append("idealStateMap:" + _idealStateCache.getIdealStateMap()).append("\n"); sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n"); sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n"); @@ -928,4 +886,4 @@ public class ClusterDataCache { return sb.toString(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index 17c83b3..9b95d54 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -19,14 +19,13 @@ package org.apache.helix.spectator; * under the License. */ -import java.util.Collections; import java.util.Map; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; import org.apache.helix.PropertyType; import org.apache.helix.common.caches.BasicClusterDataCache; import org.apache.helix.common.caches.CurrentStateCache; +import org.apache.helix.common.caches.TargetExternalViewCache; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.LiveInstance; @@ -41,13 +40,13 @@ class RoutingDataCache extends BasicClusterDataCache { private final PropertyType _sourceDataType; private CurrentStateCache _currentStateCache; - private Map<String, ExternalView> _targetExternalViewMap; + private TargetExternalViewCache _targetExternalViewCache; public RoutingDataCache(String clusterName, PropertyType sourceDataType) { super(clusterName); _sourceDataType = sourceDataType; _currentStateCache = new CurrentStateCache(clusterName); - _targetExternalViewMap = Collections.emptyMap(); + _targetExternalViewCache = new TargetExternalViewCache(clusterName); requireFullRefresh(); } @@ -68,12 +67,11 @@ class RoutingDataCache extends BasicClusterDataCache { if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW) && _propertyDataChangedMap .get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) { long start = System.currentTimeMillis(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); _propertyDataChangedMap .put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, Boolean.valueOf(false)); - _targetExternalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews()); - LOG.info("Reload TargetExternalViews: " + _targetExternalViewMap.keySet() + ". Takes " + ( - System.currentTimeMillis() - start) + " ms"); + _targetExternalViewCache.refresh(accessor); + LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size() + + " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms"); } if (_sourceDataType.equals(PropertyType.CURRENTSTATES) && _propertyDataChangedMap @@ -81,8 +79,7 @@ class RoutingDataCache extends BasicClusterDataCache { long start = System.currentTimeMillis(); Map<String, LiveInstance> liveInstanceMap = getLiveInstances(); _currentStateCache.refresh(accessor, liveInstanceMap); - LOG.info("Reload CurrentStates: " + _targetExternalViewMap.keySet() + ". Takes " + ( - System.currentTimeMillis() - start) + " ms"); + LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms"); } long endTime = System.currentTimeMillis(); @@ -91,7 +88,7 @@ class RoutingDataCache extends BasicClusterDataCache { if (LOG.isDebugEnabled()) { LOG.debug("CurrentStates: " + _currentStateCache); - LOG.debug("TargetExternalViews: " + _targetExternalViewMap); + LOG.debug("TargetExternalViews: " + _targetExternalViewCache.getExternalViewMap()); } } @@ -101,7 +98,7 @@ class RoutingDataCache extends BasicClusterDataCache { * @return */ public Map<String, ExternalView> getTargetExternalViews() { - return Collections.unmodifiableMap(_targetExternalViewMap); + return _targetExternalViewCache.getExternalViewMap(); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java index 84bcae4..0cbd824 100644 --- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java +++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java @@ -19,7 +19,6 @@ package org.apache.helix; * under the License. */ -import org.apache.helix.mock.participant.DummyProcess; import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory; import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory; import org.apache.helix.mock.participant.DummyProcess.DummyMasterSlaveStateModelFactory; http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java index d6144f3..fdd5e5c 100644 --- a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java +++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java @@ -185,12 +185,13 @@ public class TestRoutingTable { // one master add(record, "TESTDB_0", "localhost_8900", "MASTER"); externalViewList.add(new ExternalView(record)); + externalViewList.add(new ExternalView(new ZNRecord("fake"))); routingTable.onExternalViewChange(externalViewList, changeContext); instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER"); AssertJUnit.assertNotNull(instances); AssertJUnit.assertEquals(instances.size(), 1); - externalViewList.clear(); + externalViewList.remove(0); routingTable.onExternalViewChange(externalViewList, changeContext); Thread.sleep(100); instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER"); http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java index 3271a71..b032513 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java @@ -19,20 +19,14 @@ package org.apache.helix.integration.controller; * under the License. */ -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixConstants; -import org.apache.helix.HelixProperty; -import org.apache.helix.PropertyKey; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.mock.MockZkHelixDataAccessor; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.testng.Assert; @@ -135,76 +129,4 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2); } - - - public class MockZkHelixDataAccessor extends ZKHelixDataAccessor { - Map<PropertyType, Integer> _readPathCounters = new HashMap<>(); - - public MockZkHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) { - super(clusterName, null, baseDataAccessor); - } - - - @Override - public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) { - for (PropertyKey key : keys) { - addCount(key); - } - return super.getProperty(keys); - } - - @Override - public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, boolean throwException) { - for (PropertyKey key : keys) { - addCount(key); - } - return super.getProperty(keys, throwException); - } - - @Override - public <T extends HelixProperty> T getProperty(PropertyKey key) { - addCount(key); - return super.getProperty(key); - } - - @Override - public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) { - Map<String, T> map = super.getChildValuesMap(key); - addCount(key, map.keySet().size()); - return map; - } - - @Override - public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key, boolean throwException) { - Map<String, T> map = super.getChildValuesMap(key, throwException); - addCount(key, map.keySet().size()); - return map; - } - - private void addCount(PropertyKey key) { - addCount(key, 1); - } - - private void addCount(PropertyKey key, int count) { - PropertyType type = key.getType(); - if (!_readPathCounters.containsKey(type)) { - _readPathCounters.put(type, 0); - } - _readPathCounters.put(type, _readPathCounters.get(type) + count); - } - - public int getReadCount(PropertyType type) { - if (_readPathCounters.containsKey(type)) { - return _readPathCounters.get(type); - } - - return 0; - } - - public void clearReadCounters() { - _readPathCounters.clear(); - } - } - - } http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/mock/MockManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java index 349712f..bcbe47a 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java @@ -20,6 +20,7 @@ package org.apache.helix.mock; */ import java.util.UUID; +import org.apache.helix.MockAccessor; import org.apache.helix.api.listeners.ClusterConfigChangeListener; import org.apache.helix.api.listeners.ConfigChangeListener; import org.apache.helix.api.listeners.ControllerChangeListener; @@ -39,7 +40,6 @@ import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperties; import org.apache.helix.InstanceType; import org.apache.helix.LiveInstanceInfoProvider; -import org.apache.helix.MockAccessor; import org.apache.helix.PreConnectCallback; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java new file mode 100644 index 0000000..0ea33c7 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java @@ -0,0 +1,80 @@ +package org.apache.helix.mock; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; + +public class MockZkHelixDataAccessor extends ZKHelixDataAccessor { + Map<PropertyType, Integer> _readPathCounters = new HashMap<>(); + + public MockZkHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) { + super(clusterName, null, baseDataAccessor); + } + + + @Override + public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) { + for (PropertyKey key : keys) { + addCount(key); + } + return super.getProperty(keys); + } + + @Override + public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, boolean throwException) { + for (PropertyKey key : keys) { + addCount(key); + } + return super.getProperty(keys, throwException); + } + + @Override + public <T extends HelixProperty> T getProperty(PropertyKey key) { + addCount(key); + return super.getProperty(key); + } + + @Override + public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) { + Map<String, T> map = super.getChildValuesMap(key); + addCount(key, map.keySet().size()); + return map; + } + + @Override + public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key, boolean throwException) { + Map<String, T> map = super.getChildValuesMap(key, throwException); + addCount(key, map.keySet().size()); + return map; + } + + private void addCount(PropertyKey key) { + addCount(key, 1); + } + + private void addCount(PropertyKey key, int count) { + PropertyType type = key.getType(); + if (!_readPathCounters.containsKey(type)) { + _readPathCounters.put(type, 0); + } + _readPathCounters.put(type, _readPathCounters.get(type) + count); + } + + public int getReadCount(PropertyType type) { + if (_readPathCounters.containsKey(type)) { + return _readPathCounters.get(type); + } + + return 0; + } + + public void clearReadCounters() { + _readPathCounters.clear(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d02083e6/helix-core/src/test/java/org/apache/helix/spectator/TestRoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/spectator/TestRoutingDataCache.java b/helix-core/src/test/java/org/apache/helix/spectator/TestRoutingDataCache.java new file mode 100644 index 0000000..7ff3f67 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/spectator/TestRoutingDataCache.java @@ -0,0 +1,123 @@ +package org.apache.helix.spectator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixConstants; +import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.mock.MockZkHelixDataAccessor; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestRoutingDataCache extends ZkStandAloneCMTestBase { + + @Test() public void testUpdateOnNotification() throws Exception { + MockZkHelixDataAccessor accessor = + new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + + RoutingDataCache cache = + new RoutingDataCache("CLUSTER_" + TestHelper.getTestClassName(), PropertyType.EXTERNALVIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); + + accessor.clearReadCounters(); + + // refresh again should read nothing + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); + + accessor.clearReadCounters(); + // refresh again should read nothing as ideal state is same + cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); + } + + @Test(dependsOnMethods = { "testUpdateOnNotification" }) public void testSelectiveUpdates() + throws Exception { + MockZkHelixDataAccessor accessor = + new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + + RoutingDataCache cache = + new RoutingDataCache("CLUSTER_" + TestHelper.getTestClassName(), PropertyType.EXTERNALVIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); + + accessor.clearReadCounters(); + + // refresh again should read nothing + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); + + // refresh again should read nothing + cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); + + // add new resources + _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_1", 1, STATE_MODEL); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_1", _replica); + + Thread.sleep(100); + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + accessor.clearReadCounters(); + + // refresh again should read only new current states and new idealstate + cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); + + // Add more resources + accessor.clearReadCounters(); + + _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_2", 1, STATE_MODEL); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_2", _replica); + _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_3", 1, STATE_MODEL); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_3", _replica); + + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + // Totally four resources. Two of them are newly added. + cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 2); + + // update one resource + accessor.clearReadCounters(); + + _setupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, "TestDB_2", false); + + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verify()); + + cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); + } +}
