This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 405595e0288eea9ef156d0e5d54b7ccc0c768e57 Author: Harry Zhang <[email protected]> AuthorDate: Mon Nov 19 15:39:04 2018 -0800 Refactor ClusterDataCache to PropertyCache --- .../helix/common/caches/AbstractDataCache.java | 30 +- .../helix/common/caches/CurrentStateCache.java | 2 +- .../helix/common/caches/ExternalViewCache.java | 4 +- .../helix/common/caches/IdealStateCache.java | 4 +- .../apache/helix/common/caches/PropertyCache.java | 220 ++++++++ .../common/caches/TargetExternalViewCache.java | 2 + .../helix/controller/stages/ClusterDataCache.java | 570 +++++++++++++-------- .../TestClusterDataCacheSelectiveUpdate.java | 10 +- .../paticipant/TestNodeOfflineTimeStamp.java | 6 +- .../mbeans/TestTopStateHandoffMetrics.java | 10 +- 10 files changed, 621 insertions(+), 237 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java index a4808b3..ae1dc37 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java @@ -20,20 +20,23 @@ package org.apache.helix.common.caches; */ import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; +import org.apache.helix.controller.LogUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public abstract class AbstractDataCache { +public abstract class AbstractDataCache<T extends HelixProperty> { private static Logger LOG = LoggerFactory.getLogger(AbstractDataCache.class.getName()); - private String _eventId = "NO_ID"; + public static final String UNKNOWN_EVENT_ID = "NO_ID"; + private static final String UNKNOWN_PIPELINE = "UNKNOWN_PIPELINE"; + private String _eventId = UNKNOWN_EVENT_ID; + private String _pipelineName = UNKNOWN_PIPELINE; public String getEventId() { return _eventId; @@ -43,6 +46,14 @@ public abstract class AbstractDataCache { _eventId = eventId; } + public String getPipelineName() { + return _pipelineName; + } + + public void setPipelineName(String pipelineName) { + _pipelineName = pipelineName; + } + /** * Selectively fetch Helix Properties from ZK by comparing the version of local cached one with the one on ZK. * If version on ZK is newer, fetch it from zk and update local cache. @@ -50,10 +61,9 @@ public abstract class AbstractDataCache { * @param reloadKeys keys needs to be reload * @param cachedKeys keys already exists in the cache * @param cachedPropertyMap cached map of propertykey -> property object - * @param <T> the type of metadata * @return updated properties map */ - protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties( + protected Map<PropertyKey, T> refreshProperties( HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey> cachedKeys, Map<PropertyKey, T> cachedPropertyMap) { // All new entries from zk not cached locally yet should be read from ZK. @@ -77,8 +87,6 @@ public abstract class AbstractDataCache { } } - - List<T> reloadedProperty = accessor.getProperty(reloadKeys, true); Iterator<PropertyKey> csKeyIter = reloadKeys.iterator(); for (T property : reloadedProperty) { @@ -90,7 +98,7 @@ public abstract class AbstractDataCache { } } - LOG.info(reloadKeys.size() + " properties refreshed from zk."); + LogUtil.logInfo(LOG, getEventId(), String.format("%s properties refreshed from ZK.", reloadKeys.size())); if (LOG.isDebugEnabled()) { LOG.debug("refreshed keys: " + reloadKeys); } diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java index 40ba036..aef40c3 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -39,7 +39,7 @@ import java.util.Set; /** * Cache to hold all CurrentStates of a cluster. */ -public class CurrentStateCache extends AbstractDataCache { +public class CurrentStateCache extends AbstractDataCache<CurrentState> { private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName()); private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap; diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java index 94c2b16..1b2ea2b 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java @@ -38,8 +38,10 @@ import org.slf4j.LoggerFactory; /** * Cache to hold all ExternalViews of a cluster. + * Deprecated - use {@link PropertyCache<ExternalView>} instead */ -public class ExternalViewCache extends AbstractDataCache { +@Deprecated +public class ExternalViewCache extends AbstractDataCache<ExternalView> { private static final Logger LOG = LoggerFactory.getLogger(ExternalViewCache.class.getName()); protected Map<String, ExternalView> _externalViewMap; diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java index 99de0bc..571afdc 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java @@ -37,8 +37,10 @@ import org.slf4j.LoggerFactory; /** * Cache to hold all IdealStates of a cluster. + * Deprecated - use {@link PropertyCache<IdealState>} instead */ -public class IdealStateCache extends AbstractDataCache { +@Deprecated +public class IdealStateCache extends AbstractDataCache<IdealState> { private static final Logger LOG = LoggerFactory.getLogger(IdealStateCache.class.getName()); private Map<String, IdealState> _idealStateMap; diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java new file mode 100644 index 0000000..1d24fad --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java @@ -0,0 +1,220 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.controller.LogUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A general cache for HelixProperty that supports LIST, GET, SET, DELETE methods of Helix property. + * All operation is in memory and is not persisted into the data store, but it provides a method to + * refresh cache content from data store using the given data accessor. + * + * Currently this class only supports helix properties that stores under same root, i.e. + * IdealState, InstanceConfig, ExternalView, LiveInstance, ResourceConfig, etc. + * + * WARNING: This class is not thread safe - caller should refresh and utilize the cache in a + * thread-safe manner + * @param <T> + */ +public class PropertyCache<T extends HelixProperty> extends AbstractDataCache<T> { + private static final Logger LOG = LoggerFactory.getLogger(PropertyCache.class); + + /** + * Interface to abstract retrieval of a type of HelixProperty and its instances + * @param <O> type of HelixProperty + */ + public interface PropertyCacheKeyFuncs<O extends HelixProperty> { + /** + * Get PropertyKey for the root of this type of object, used for LIST all objects + * @return property key to object root + */ + PropertyKey getRootKey(HelixDataAccessor accessor); + + /** + * Get PropertyKey for a single object of this type, used for GET single instance of the type + * @param objName object name + * @return property key to the object instance + */ + PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName); + + /** + * Get the string to identify the object when we actually use them. It's not necessarily the + * "id" field of HelixProperty, but could have more semantic meanings of that object type + * @param obj object instance + * @return object identifier + */ + String getObjName(O obj); + } + + // Used for serving user operations + private Map<String, T> _objMap; + + // Used for caching data from object store - this makes it possible to have async + // data refresh from object store + private Map<String, T> _objCache; + + private final String _clusterName; + private final String _propertyDescription; + private final boolean _useSelectiveUpdate; + private final PropertyCacheKeyFuncs<T> _keyFuncs; + + public PropertyCache(String clusterName, String propertyDescription, PropertyCacheKeyFuncs<T> keyFuncs, + boolean useSelectiveUpdate) { + _clusterName = clusterName; + _propertyDescription = propertyDescription; + _keyFuncs = keyFuncs; + _objMap = new HashMap<>(); + _objCache = new HashMap<>(); + _useSelectiveUpdate = useSelectiveUpdate; + } + + protected class SelectivePropertyRefreshInputs { + private final List<PropertyKey> reloadKeys; + private final List<PropertyKey> cachedKeys; + private final Map<PropertyKey, T> cachedPropertyMap; + + SelectivePropertyRefreshInputs(List<PropertyKey> keysToReload, + List<PropertyKey> currentlyCachedKeys, Map<PropertyKey, T> currentCache) { + reloadKeys = keysToReload; + cachedKeys = currentlyCachedKeys; + cachedPropertyMap = currentCache; + } + + List<PropertyKey> getCachedKeys() { + return cachedKeys; + } + + List<PropertyKey> getReloadKeys() { + return reloadKeys; + } + + Map<PropertyKey, T> getCachedPropertyMap() { + return cachedPropertyMap; + } + } + + @SuppressWarnings("unchecked") + private SelectivePropertyRefreshInputs genSelectiveUpdateInput( + HelixDataAccessor accessor, Map<String, T> currentCache, PropertyCache.PropertyCacheKeyFuncs<T> propertyKeyFuncs) { + // Generate keys for all current live instances + Set<PropertyKey> curObjKeys = new HashSet<>(); + for (String liveInstanceName : accessor.getChildNames(propertyKeyFuncs.getRootKey(accessor))) { + curObjKeys.add(propertyKeyFuncs.getObjPropertyKey(accessor, liveInstanceName)); + } + + // Generate cached objects + Set<PropertyKey> cachedKeys = new HashSet<>(); + Map<PropertyKey, T> cachedObjs = new HashMap<>(); + for (String objName : currentCache.keySet()) { + PropertyKey objKey = propertyKeyFuncs.getObjPropertyKey(accessor, objName); + cachedKeys.add(objKey); + cachedObjs.put(objKey, currentCache.get(objName)); + } + cachedKeys.retainAll(curObjKeys); + + // Generate keys to reload + Set<PropertyKey> reloadKeys = new HashSet<>(curObjKeys); + reloadKeys.removeAll(cachedKeys); + + return new SelectivePropertyRefreshInputs(new ArrayList<>(reloadKeys), + new ArrayList<>(cachedKeys), cachedObjs); + } + + /** + * Refresh the cache with the given data accessor + * @param accessor helix data accessor provided by caller + */ + public void refresh(final HelixDataAccessor accessor) { + long start = System.currentTimeMillis(); + if (_useSelectiveUpdate) { + doRefreshWithSelectiveUpdate(accessor); + } else { + doSimpleCacheRefresh(accessor); + } + LogUtil.logInfo(LOG, getEventId(), String.format( + "Refreshed %s property %s from cluster %s took %s ms for %s pipeline. Selective: %s", + _objMap.size(), _propertyDescription, _clusterName, + System.currentTimeMillis() - start, getPipelineName(), _useSelectiveUpdate)); + } + + private void doSimpleCacheRefresh(final HelixDataAccessor accessor) { + _objCache = accessor.getChildValuesMap(_keyFuncs.getRootKey(accessor), true); + _objMap = new HashMap<>(_objCache); + } + + private void doRefreshWithSelectiveUpdate(final HelixDataAccessor accessor) { + SelectivePropertyRefreshInputs input = + genSelectiveUpdateInput(accessor, _objCache, _keyFuncs); + Map<PropertyKey, T> updatedData = + refreshProperties(accessor, input.getReloadKeys(), input.getCachedKeys(), + input.getCachedPropertyMap()); + _objCache = propertyKeyMapToStringMap(updatedData, _keyFuncs); + + // need to separate keys so we can potentially update cache map asynchronously while + // keeping snapshot unchanged + _objMap = new HashMap<>(_objCache); + } + + private Map<String, T> propertyKeyMapToStringMap(Map<PropertyKey, T> propertyKeyMap, + PropertyCache.PropertyCacheKeyFuncs<T> objNameFunc) { + Map<String, T> stringMap = new HashMap<>(); + for (T obj : propertyKeyMap.values()) { + stringMap.put(objNameFunc.getObjName(obj), obj); + } + return stringMap; + } + + public Map<String, T> getPropertyMap() { + return Collections.unmodifiableMap(_objMap); + } + + public T getPropertyByName(String name) { + if (name == null) { + return null; + } + return _objMap.get(name); + } + + public void setPropertyMap(Map<String, T> objMap) { + // make a copy in case objMap is modified by the caller later on + // not updating the cache as cache is for data from data store + _objMap = new HashMap<>(objMap); + } + + public void setProperty(T obj) { + _objMap.put(_keyFuncs.getObjName(obj), obj); + } + + public void deletePropertyByName(String name) { + _objMap.remove(name); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java index fdff9b7..eddcdfb 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java @@ -22,7 +22,9 @@ import org.apache.helix.PropertyType; /** * Cache to hold all TargetExternalViews of a cluster. + * Deprecated - use {@link PropertyCache<org.apache.helix.model.ExternalView>} instead */ +@Deprecated public class TargetExternalViewCache extends ExternalViewCache { public TargetExternalViewCache(String clusterName) { super(clusterName, PropertyType.TARGETEXTERNALVIEW); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index c240e23..f8fedcb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -19,26 +19,26 @@ package org.apache.helix.controller.stages; * under the License. */ -import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.CurrentStateCache; -import org.apache.helix.common.caches.IdealStateCache; import org.apache.helix.common.caches.InstanceMessagesCache; +import org.apache.helix.common.caches.PropertyCache; import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.controller.LogUtil; import org.apache.helix.model.ClusterConfig; @@ -67,38 +67,50 @@ import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.helix.HelixConstants.ChangeType; +import static org.apache.helix.HelixConstants.*; /** * Reads the data from the cluster using data accessor. This output ClusterData which * provides useful methods to search/lookup properties */ -public class ClusterDataCache extends AbstractDataCache { +public class ClusterDataCache { private static final Logger LOG = LoggerFactory.getLogger(ClusterDataCache.class.getName()); + private static final List<ChangeType> _noFullRefreshProperty = + Arrays.asList(ChangeType.EXTERNAL_VIEW, ChangeType.TARGET_EXTERNAL_VIEW); + private final String _clusterName; + private String _eventId = AbstractDataCache.UNKNOWN_EVENT_ID; private ClusterConfig _clusterConfig; - private Map<String, LiveInstance> _liveInstanceMap; - private Map<String, LiveInstance> _liveInstanceCacheMap; - private Map<String, StateModelDefinition> _stateModelDefMap; - private Map<String, InstanceConfig> _instanceConfigMap; - private Map<String, InstanceConfig> _instanceConfigCacheMap; + private boolean _updateInstanceOfflineTime = true; + private boolean _isTaskCache; + private boolean _isMaintenanceModeEnabled; + private ExecutorService _asyncTasksThreadPool; + + // A map recording what data has changed + private Map<ChangeType, Boolean> _propertyDataChangedMap; + + // Property caches + private final PropertyCache<ExternalView> _externalViewCache; + private final PropertyCache<ExternalView> _targetExternalViewCache; + private final PropertyCache<ResourceConfig> _resourceConfigCache; + private final PropertyCache<InstanceConfig> _instanceConfigCache; + private final PropertyCache<LiveInstance> _liveInstanceCache; + private final PropertyCache<IdealState> _idealStateCache; + private final PropertyCache<ClusterConstraints> _clusterConstraintsCache; + private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache; + + // Special caches + private CurrentStateCache _currentStateCache; + private TaskDataCache _taskDataCache; + private InstanceMessagesCache _instanceMessagesCache; + + // Other miscellaneous caches private Map<String, Long> _instanceOfflineTimeMap; - private Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>(); - private Map<String, ResourceConfig> _resourceConfigCacheMap; - private Map<String, ClusterConstraints> _constraintMap; private Map<String, Map<String, String>> _idealStateRuleMap; private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>(); private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>(); - private Map<String, ExternalView> _targetExternalViewMap; - private Map<String, ExternalView> _externalViewMap; private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>(); private Set<String> _disabledInstanceSet = new HashSet<>(); - private String _eventId = "NO_ID"; - - private IdealStateCache _idealStateCache; - private CurrentStateCache _currentStateCache; - private TaskDataCache _taskDataCache; - private InstanceMessagesCache _instanceMessagesCache; // maintain a cache of bestPossible assignment across pipeline runs // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache. @@ -107,18 +119,8 @@ public class ClusterDataCache extends AbstractDataCache { // maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs private Map<String, ZNRecord> _idealMappingCache = new HashMap<>(); - private Map<ChangeType, Boolean> _propertyDataChangedMap; - private Map<String, Integer> _participantActiveTaskCount = new HashMap<>(); - private ExecutorService _asyncTasksThreadPool; - - boolean _updateInstanceOfflineTime = true; - boolean _isTaskCache; - boolean _isMaintenanceModeEnabled; - - private String _clusterName; - // For detecting liveinstance and target resource partition state change in task assignment // Used in AbstractTaskDispatcher private boolean _existsLiveInstanceOrCurrentStateChange = false; @@ -134,122 +136,282 @@ public class ClusterDataCache extends AbstractDataCache { public ClusterDataCache(String clusterName) { _propertyDataChangedMap = new ConcurrentHashMap<>(); for (ChangeType type : ChangeType.values()) { + // refresh every type when it is initialized _propertyDataChangedMap.put(type, true); } _clusterName = clusterName; - _idealStateCache = new IdealStateCache(_clusterName); + + _externalViewCache = new PropertyCache<>(_clusterName, "ExternalView", new PropertyCache.PropertyCacheKeyFuncs<ExternalView>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().externalViews(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().externalView(objName); + } + + @Override + public String getObjName(ExternalView obj) { + return obj.getResourceName(); + } + }, true); + _targetExternalViewCache = new PropertyCache<>(_clusterName, "TargetExternalView", new PropertyCache.PropertyCacheKeyFuncs<ExternalView>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().targetExternalViews(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().targetExternalView(objName); + } + + @Override + public String getObjName(ExternalView obj) { + return obj.getResourceName(); + } + }, true); + _resourceConfigCache = new PropertyCache<>(_clusterName, "ResourceConfig", new PropertyCache.PropertyCacheKeyFuncs<ResourceConfig>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().resourceConfigs(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().resourceConfig(objName); + } + + @Override + public String getObjName(ResourceConfig obj) { + return obj.getResourceName(); + } + }, true); + _liveInstanceCache = new PropertyCache<>(_clusterName, "LiveInstance", new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().liveInstances(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().liveInstance(objName); + } + + @Override + public String getObjName(LiveInstance obj) { + return obj.getInstanceName(); + } + }, true); + _instanceConfigCache = new PropertyCache<>(_clusterName, "InstanceConfig", new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().instanceConfigs(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().instanceConfig(objName); + } + + @Override + public String getObjName(InstanceConfig obj) { + return obj.getInstanceName(); + } + }, true); + _idealStateCache = new PropertyCache<>(_clusterName, "IdealState", new PropertyCache.PropertyCacheKeyFuncs<IdealState>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().idealStates(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().idealStates(objName); + } + + @Override + public String getObjName(IdealState obj) { + return obj.getResourceName(); + } + }, true); + _clusterConstraintsCache = new PropertyCache<>(_clusterName, "ClusterConstraint", new PropertyCache.PropertyCacheKeyFuncs<ClusterConstraints>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().constraints(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().constraint(objName); + } + + @Override + public String getObjName(ClusterConstraints obj) { + // We set constraint type to the HelixProperty id + return obj.getId(); + } + }, false); + _stateModelDefinitionCache = new PropertyCache<>(_clusterName, "StateModelDefinition", new PropertyCache.PropertyCacheKeyFuncs<StateModelDefinition>() { + @Override + public PropertyKey getRootKey(HelixDataAccessor accessor) { + return accessor.keyBuilder().stateModelDefs(); + } + + @Override + public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) { + return accessor.keyBuilder().stateModelDef(objName); + } + + @Override + public String getObjName(StateModelDefinition obj) { + return obj.getId(); + } + }, false); + _currentStateCache = new CurrentStateCache(_clusterName); _taskDataCache = new TaskDataCache(_clusterName); _instanceMessagesCache = new InstanceMessagesCache(_clusterName); } - /** - * This refreshes the cluster data by re-fetching the data from zookeeper in - * an efficient way - * @param accessor - * @return - */ - public synchronized boolean refresh(HelixDataAccessor accessor) { - long startTime = System.currentTimeMillis(); - Builder keyBuilder = accessor.keyBuilder(); - - // Reset the LiveInstance/CurrentState change flag - _existsLiveInstanceOrCurrentStateChange = false; + private void updateCachePipelineNames() { + // TODO (harry): remove such update cache name after we have specific cache for + // different pipelines + String pipelineType = _isTaskCache ? "TASK" : "DEFAULT"; + _externalViewCache.setPipelineName(pipelineType); + _targetExternalViewCache.setPipelineName(pipelineType); + _resourceConfigCache.setPipelineName(pipelineType); + _liveInstanceCache.setPipelineName(pipelineType); + _instanceConfigCache.setPipelineName(pipelineType); + _idealStateCache.setPipelineName(pipelineType); + _clusterConstraintsCache.setPipelineName(pipelineType); + _stateModelDefinitionCache.setPipelineName(pipelineType); + } + private void refreshIdealState(final HelixDataAccessor accessor) { if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) { _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, false); clearCachedResourceAssignments(); _idealStateCache.refresh(accessor); - LogUtil.logInfo(LOG, _eventId, - "Refresh IdealStates for cluster " + _clusterName + ", took " - + (System.currentTimeMillis() - startTime) + " ms for " - + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + } else { + LogUtil.logInfo(LOG, getEventId(), String + .format("No ideal state change for %s cluster, %s pipeline", _clusterName, + _idealStateCache.getPipelineName())); } + } + private void refreshLiveInstances(final HelixDataAccessor accessor) { if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) { - startTime = System.currentTimeMillis(); _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, false); clearCachedResourceAssignments(); - _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true); + _liveInstanceCache.refresh(accessor); _updateInstanceOfflineTime = true; - LogUtil.logInfo(LOG, _eventId, - "Refresh LiveInstances for cluster " + _clusterName + ", took " - + (System.currentTimeMillis() - startTime) + " ms for " - + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + } else { + LogUtil.logInfo(LOG, getEventId(), String + .format("No live instance change for %s cluster, %s pipeline", _clusterName, + _liveInstanceCache.getPipelineName())); } + } + private void refreshInstanceConfigs(final HelixDataAccessor accessor) { if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) { _existsInstanceChange = true; _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false); clearCachedResourceAssignments(); - _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true); - LogUtil.logInfo(LOG, _eventId, - "Reload InstanceConfig for cluster " + _clusterName + " : " + _instanceConfigCacheMap - .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + _instanceConfigCache.refresh(accessor); + LogUtil.logInfo(LOG, getEventId(), String + .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: %s", _clusterName, + _instanceConfigCache.getPipelineName(), + _instanceConfigCache.getPropertyMap().keySet())); + } else { + LogUtil.logInfo(LOG, getEventId(), String + .format("No instance config change for %s cluster, %s pipeline", _clusterName, + _liveInstanceCache.getPipelineName())); } + } + private void refreshResourceConfig(final HelixDataAccessor accessor) { if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) { _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, false); clearCachedResourceAssignments(); - - _resourceConfigCacheMap = refreshResourceConfigs(accessor); - LogUtil.logInfo(LOG, _eventId, - "Reload ResourceConfigs for cluster " + _clusterName + " : " + _resourceConfigCacheMap - .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); - } - - // This is for targeted jobs' task assignment. It needs to watch for current state changes for - // when targeted resources' state transitions complete - if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) { - _existsLiveInstanceOrCurrentStateChange = true; - _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false); + _resourceConfigCache.refresh(accessor); + LogUtil.logInfo(LOG, getEventId(), String + .format("Reloaded ResourceConfig for cluster %s, %s pipeline. Cnt: %s", _clusterName, + _resourceConfigCache.getPipelineName(), + _resourceConfigCache.getPropertyMap().keySet().size())); + } else { + LogUtil.logInfo(LOG, getEventId(), String + .format("No resource config change for %s cluster, %s pipeline", _clusterName, + _liveInstanceCache.getPipelineName())); } + } - // This is for AssignableInstances. Whenever there is a quota config change in ClusterConfig, we - // must trigger an update to AssignableInstanceManager - if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) { - _existsClusterConfigChange = true; - _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false); + private void refreshExternalViews(final HelixDataAccessor accessor) { + // As we are not listening on external view change, external view will be + // refreshed once during the cache's first refresh() call, or when full refresh is required + if (_propertyDataChangedMap.get(ChangeType.EXTERNAL_VIEW)) { + _externalViewCache.refresh(accessor); + _propertyDataChangedMap.put(ChangeType.EXTERNAL_VIEW, false); } + } - _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap); - _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap); - _resourceConfigMap = new HashMap<>(_resourceConfigCacheMap); + private void refreshTargetExternalViews(final HelixDataAccessor accessor) { + if (_propertyDataChangedMap.get(ChangeType.TARGET_EXTERNAL_VIEW)) { + if (_clusterConfig != null && _clusterConfig.isTargetExternalViewEnabled()) { + _targetExternalViewCache.refresh(accessor); - if (_updateInstanceOfflineTime) { - updateOfflineInstanceHistory(accessor); + // Only set the change type back once we get refreshed with data accessor for the + // first time + _propertyDataChangedMap.put(ChangeType.TARGET_EXTERNAL_VIEW, false); + } } + } - Map<String, StateModelDefinition> stateDefMap = - accessor.getChildValuesMap(keyBuilder.stateModelDefs(), true); - _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap); - _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints(), true); - _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - - if (_isTaskCache) { - // Refresh TaskCache - _taskDataCache.refresh(accessor, _resourceConfigMap); + private void updateMaintenanceInfo(final HelixDataAccessor accessor) { + MaintenanceSignal maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance()); + _isMaintenanceModeEnabled = maintenanceSignal != null; + } - // Refresh AssignableInstanceManager - AssignableInstanceManager assignableInstanceManager = - _taskDataCache.getAssignableInstanceManager(); - // Build from scratch every time - assignableInstanceManager - .buildAssignableInstances(_clusterConfig, _taskDataCache, _liveInstanceMap, - _instanceConfigMap); - // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being + /** + * This refreshes the cluster data by re-fetching the data from zookeeper in + * an efficient way + * @param accessor + * @return + */ + public synchronized boolean refresh(HelixDataAccessor accessor) { + long startTime = System.currentTimeMillis(); - assignableInstanceManager.logQuotaProfileJSON(false); - } + // Reset the LiveInstance/CurrentState change flag + _existsLiveInstanceOrCurrentStateChange = false; - _instanceMessagesCache.refresh(accessor, _liveInstanceMap); - _currentStateCache.refresh(accessor, _liveInstanceMap); + // Refresh raw data + _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); + refreshIdealState(accessor); + refreshLiveInstances(accessor); + refreshInstanceConfigs(accessor); + refreshResourceConfig(accessor); + _stateModelDefinitionCache.refresh(accessor); + _clusterConstraintsCache.refresh(accessor); + refreshExternalViews(accessor); + refreshTargetExternalViews(accessor); + updateMaintenanceInfo(accessor); + + // Refresh derived data + _instanceMessagesCache.refresh(accessor, _liveInstanceCache.getPropertyMap()); + _currentStateCache.refresh(accessor, _liveInstanceCache.getPropertyMap()); // current state must be refreshed before refreshing relay messages // because we need to use current state to validate all relay messages. - _instanceMessagesCache.updateRelayMessages(_liveInstanceMap, + _instanceMessagesCache.updateRelayMessages(_liveInstanceCache.getPropertyMap(), _currentStateCache.getCurrentStatesMap()); + if (_updateInstanceOfflineTime) { + // TODO: make it async + updateOfflineInstanceHistory(accessor); + } + if (_clusterConfig != null) { _idealStateRuleMap = _clusterConfig.getIdealStateRules(); } else { @@ -258,18 +420,38 @@ public class ClusterDataCache extends AbstractDataCache { "Cluster config is null for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); } - MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance()); - _isMaintenanceModeEnabled = maintenanceSignal != null; - updateDisabledInstances(); - if (_externalViewMap == null) { - _externalViewMap = accessor.getChildValuesMap(accessor.keyBuilder().externalViews()); + + // TaskFramework related operations + + // This is for targeted jobs' task assignment. It needs to watch for current state changes for + // when targeted resources' state transitions complete + if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) { + _existsLiveInstanceOrCurrentStateChange = true; + _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false); + } + + // This is for AssignableInstances. Whenever there is a quota config change in ClusterConfig, we + // must trigger an update to AssignableInstanceManager + if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) { + _existsClusterConfigChange = true; + _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false); } - if (_clusterConfig.isTargetExternalViewEnabled() && _targetExternalViewMap == null) { - _targetExternalViewMap = - accessor.getChildValuesMap(accessor.keyBuilder().targetExternalViews()); + if (_isTaskCache) { + // Refresh TaskCache + _taskDataCache.refresh(accessor, _resourceConfigCache.getPropertyMap()); + + // Refresh AssignableInstanceManager + AssignableInstanceManager assignableInstanceManager = + _taskDataCache.getAssignableInstanceManager(); + // Build from scratch every time + assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache, + _liveInstanceCache.getPropertyMap(), _instanceConfigCache.getPropertyMap()); + // TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being + + assignableInstanceManager.logQuotaProfileJSON(false); } long endTime = System.currentTimeMillis(); @@ -278,19 +460,27 @@ public class ClusterDataCache extends AbstractDataCache { + (endTime - startTime) + " ms for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + dumpDebugInfo(); + return true; + } + + private void dumpDebugInfo() { if (LOG.isDebugEnabled()) { LogUtil.logDebug(LOG, _eventId, - "# of StateModelDefinition read from zk: " + _stateModelDefMap.size()); - LogUtil.logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + _constraintMap.size()); - LogUtil.logDebug(LOG, _eventId, "LiveInstances: " + _liveInstanceMap.keySet()); - for (LiveInstance instance : _liveInstanceMap.values()) { + "# of StateModelDefinition read from zk: " + _stateModelDefinitionCache.getPropertyMap().size()); + LogUtil.logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + _clusterConstraintsCache.getPropertyMap().size()); + LogUtil.logDebug(LOG, _eventId, + "LiveInstances: " + _liveInstanceCache.getPropertyMap().keySet()); + for (LiveInstance instance : _liveInstanceCache.getPropertyMap().values()) { LogUtil.logDebug(LOG, _eventId, "live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); } + LogUtil + .logDebug(LOG, _eventId, "IdealStates: " + _idealStateCache.getPropertyMap().keySet()); + LogUtil.logDebug(LOG, _eventId, + "ResourceConfigs: " + _resourceConfigCache.getPropertyMap().keySet()); LogUtil.logDebug(LOG, _eventId, - "IdealStates: " + _idealStateCache.getIdealStateMap().keySet()); - LogUtil.logDebug(LOG, _eventId, "ResourceConfigs: " + _resourceConfigMap.keySet()); - LogUtil.logDebug(LOG, _eventId, "InstanceConfigs: " + _instanceConfigMap.keySet()); + "InstanceConfigs: " + _instanceConfigCache.getPropertyMap().keySet()); LogUtil.logDebug(LOG, _eventId, "ClusterConfigs: " + _clusterConfig); LogUtil.logDebug(LOG, _eventId, "JobContexts: " + _taskDataCache.getContexts().keySet()); } @@ -298,15 +488,13 @@ public class ClusterDataCache extends AbstractDataCache { if (LOG.isTraceEnabled()) { LOG.trace("Cache content: " + toString()); } - - return true; } private void updateDisabledInstances() { // Move the calculating disabled instances to refresh _disabledInstanceForPartitionMap.clear(); _disabledInstanceSet.clear(); - for (InstanceConfig config : _instanceConfigMap.values()) { + for (InstanceConfig config : _instanceConfigCache.getPropertyMap().values()) { Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap(); if (!config.getInstanceEnabled()) { _disabledInstanceSet.add(config.getInstanceName()); @@ -324,14 +512,15 @@ public class ClusterDataCache extends AbstractDataCache { } } } - if (_clusterConfig.getDisabledInstances() != null) { + if (_clusterConfig != null && _clusterConfig.getDisabledInstances() != null) { _disabledInstanceSet.addAll(_clusterConfig.getDisabledInstances().keySet()); } } private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { - List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet()); - offlineNodes.removeAll(_liveInstanceMap.keySet()); + List<String> offlineNodes = + new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet()); + offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet()); _instanceOfflineTimeMap = new HashMap<>(); for (String instance : offlineNodes) { @@ -379,11 +568,11 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Map<String, IdealState> getIdealStates() { - return _idealStateCache.getIdealStateMap(); + return _idealStateCache.getPropertyMap(); } public synchronized void setIdealStates(List<IdealState> idealStates) { - _idealStateCache.setIdealStates(idealStates); + _idealStateCache.setPropertyMap(HelixProperty.convertListToMap(idealStates)); } public Map<String, Map<String, String>> getIdealStateRules() { @@ -395,14 +584,14 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Map<String, LiveInstance> getLiveInstances() { - return _liveInstanceMap; + return _liveInstanceCache.getPropertyMap(); } /** * Return the set of all instances names. */ public Set<String> getAllInstances() { - return new HashSet<>(_instanceConfigMap.keySet()); + return _instanceConfigCache.getPropertyMap().keySet(); } /** @@ -421,7 +610,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Set<String> getEnabledInstances() { - Set<String> enabledNodes = getAllInstances(); + Set<String> enabledNodes = new HashSet<>(getAllInstances()); enabledNodes.removeAll(getDisabledInstances()); return enabledNodes; @@ -448,8 +637,8 @@ public class ClusterDataCache extends AbstractDataCache { */ public Set<String> getInstancesWithTag(String instanceTag) { Set<String> taggedInstances = new HashSet<>(); - for (String instance : _instanceConfigMap.keySet()) { - InstanceConfig instanceConfig = _instanceConfigMap.get(instance); + for (String instance : _instanceConfigCache.getPropertyMap().keySet()) { + InstanceConfig instanceConfig = _instanceConfigCache.getPropertyByName(instance); if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) { taggedInstances.add(instance); } @@ -459,11 +648,7 @@ public class ClusterDataCache extends AbstractDataCache { } public synchronized void setLiveInstances(List<LiveInstance> liveInstances) { - Map<String, LiveInstance> liveInstanceMap = new HashMap<>(); - for (LiveInstance liveInstance : liveInstances) { - liveInstanceMap.put(liveInstance.getId(), liveInstance); - } - _liveInstanceCacheMap = liveInstanceMap; + _liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances)); _updateInstanceOfflineTime = true; // TODO: Move this when listener for LiveInstance is being refactored @@ -510,10 +695,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public StateModelDefinition getStateModelDef(String stateModelDefRef) { - if (stateModelDefRef == null) { - return null; - } - return _stateModelDefMap.get(stateModelDefRef); + return _stateModelDefinitionCache.getPropertyByName(stateModelDefRef); } /** @@ -521,7 +703,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return state model definition map */ public Map<String, StateModelDefinition> getStateModelDefMap() { - return _stateModelDefMap; + return _stateModelDefinitionCache.getPropertyMap(); } /** @@ -530,7 +712,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public IdealState getIdealState(String resourceName) { - return _idealStateCache.getIdealStateMap().get(resourceName); + return _idealStateCache.getPropertyByName(resourceName); } /** @@ -538,7 +720,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Map<String, InstanceConfig> getInstanceConfigMap() { - return _instanceConfigMap; + return _instanceConfigCache.getPropertyMap(); } /** @@ -546,7 +728,7 @@ public class ClusterDataCache extends AbstractDataCache { * @param instanceConfigMap */ public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) { - _instanceConfigMap = instanceConfigMap; + _instanceConfigCache.setPropertyMap(instanceConfigMap); } /** @@ -554,7 +736,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Map<String, ResourceConfig> getResourceConfigMap() { - return _resourceConfigMap; + return _resourceConfigCache.getPropertyMap(); } /** @@ -571,12 +753,8 @@ public class ClusterDataCache extends AbstractDataCache { notifyDataChange(changeType); } - /** - * Returns the instance config map - * @return - */ public ResourceConfig getResourceConfig(String resource) { - return _resourceConfigMap.get(resource); + return _resourceConfigCache.getPropertyByName(resource); } /** @@ -613,14 +791,6 @@ public class ClusterDataCache extends AbstractDataCache { return _taskDataCache.getWorkflowConfig(resource); } - public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) { - Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); - for (InstanceConfig instanceConfig : instanceConfigs) { - instanceConfigMap.put(instanceConfig.getId(), instanceConfig); - } - _instanceConfigCacheMap = instanceConfigMap; - } - /** * Some partitions might be disabled on specific nodes. * This method allows one to fetch the set of nodes where a given partition is disabled @@ -653,14 +823,14 @@ public class ClusterDataCache extends AbstractDataCache { */ public int getReplicas(String resourceName) { int replicas = -1; - Map<String, IdealState> idealStateMap = _idealStateCache.getIdealStateMap(); + Map<String, IdealState> idealStateMap = _idealStateCache.getPropertyMap(); if (idealStateMap.containsKey(resourceName)) { String replicasStr = idealStateMap.get(resourceName).getReplicas(); if (replicasStr != null) { if (replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString())) { - replicas = _liveInstanceMap.size(); + replicas = _liveInstanceCache.getPropertyMap().size(); } else { try { replicas = Integer.parseInt(replicasStr); @@ -683,10 +853,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public ClusterConstraints getConstraint(ConstraintType type) { - if (_constraintMap != null) { - return _constraintMap.get(type.toString()); - } - return null; + return _clusterConstraintsCache.getPropertyByName(type.name()); } public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() { @@ -793,11 +960,11 @@ public class ClusterDataCache extends AbstractDataCache { } public ExternalView getTargetExternalView(String resourceName) { - return _targetExternalViewMap == null ? null : _targetExternalViewMap.get(resourceName); + return _targetExternalViewCache.getPropertyByName(resourceName); } public void updateTargetExternalView(String resourceName, ExternalView targetExternalView) { - _targetExternalViewMap.put(resourceName, targetExternalView); + _targetExternalViewCache.setProperty(targetExternalView); } /** @@ -805,10 +972,7 @@ public class ClusterDataCache extends AbstractDataCache { * @return */ public Map<String, ExternalView> getExternalViews() { - if (_externalViewMap == null) { - return Collections.emptyMap(); - } - return Collections.unmodifiableMap(_externalViewMap); + return _externalViewCache.getPropertyMap(); } /** @@ -816,11 +980,8 @@ public class ClusterDataCache extends AbstractDataCache { * @param externalViews */ public void updateExternalViews(List<ExternalView> externalViews) { - if (_externalViewMap == null) { - _externalViewMap = new HashMap<>(); - } - for (ExternalView externalView : externalViews) { - _externalViewMap.put(externalView.getResourceName(), externalView); + for (ExternalView ev : externalViews) { + _externalViewCache.setProperty(ev); } } @@ -830,8 +991,8 @@ public class ClusterDataCache extends AbstractDataCache { */ public void removeExternalViews(List<String> resourceNames) { - for (String externalView : resourceNames) { - _externalViewMap.remove(externalView); + for (String resourceName : resourceNames) { + _externalViewCache.deletePropertyByName(resourceName); } } @@ -840,7 +1001,10 @@ public class ClusterDataCache extends AbstractDataCache { */ public synchronized void requireFullRefresh() { for (ChangeType type : ChangeType.values()) { - _propertyDataChangedMap.put(type, true); + // We only refresh EV and TEV the very first time the cluster data cache is initialized + if (!_noFullRefreshProperty.contains(type)) { + _propertyDataChangedMap.put(type, true); + } } } @@ -931,6 +1095,7 @@ public class ClusterDataCache extends AbstractDataCache { */ public void setTaskCache(boolean taskCache) { _isTaskCache = taskCache; + updateCachePipelineNames(); } /** @@ -959,6 +1124,11 @@ public class ClusterDataCache extends AbstractDataCache { _idealStateCache.setEventId(eventId); _currentStateCache.setEventId(eventId); _taskDataCache.setEventId(eventId); + _liveInstanceCache.setEventId(eventId); + _instanceConfigCache.setEventId(eventId); + _resourceConfigCache.setEventId(eventId); + _stateModelDefinitionCache.setEventId(eventId); + _clusterConstraintsCache.setEventId(eventId); } /** @@ -970,53 +1140,17 @@ public class ClusterDataCache extends AbstractDataCache { return _existsLiveInstanceOrCurrentStateChange; } - private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor accessor) { - Map<String, ResourceConfig> refreshedResourceConfigs = Maps.newHashMap(); - - long startTime = System.currentTimeMillis(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Set<PropertyKey> currentResourceConfigKeys = new HashSet<>(); - for (String resourceConfig : accessor.getChildNames(keyBuilder.resourceConfigs())) { - currentResourceConfigKeys.add(keyBuilder.resourceConfig(resourceConfig)); - } - - Set<PropertyKey> cachedKeys = new HashSet<>(); - Map<PropertyKey, ResourceConfig> cachedResourceConfigMap = Maps.newHashMap(); - - for (String resourceConfig : _resourceConfigMap.keySet()) { - cachedKeys.add(keyBuilder.resourceConfig(resourceConfig)); - cachedResourceConfigMap.put(keyBuilder.resourceConfig(resourceConfig), - _resourceConfigMap.get(resourceConfig)); - } - cachedKeys.retainAll(currentResourceConfigKeys); - - Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys); - reloadKeys.removeAll(cachedKeys); - - Map<PropertyKey, ResourceConfig> updatedMap = refreshProperties(accessor, - new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), cachedResourceConfigMap); - for (ResourceConfig resourceConfig : updatedMap.values()) { - refreshedResourceConfigs.put(resourceConfig.getResourceName(), resourceConfig); - } - - long endTime = System.currentTimeMillis(); - LogUtil.logInfo(LOG, getEventId(), - "Refresh " + refreshedResourceConfigs.size() + " resource configs for cluster " - + _clusterName + ", took " + (endTime - startTime) + " ms"); - return refreshedResourceConfigs; - } - /** * toString method to print the entire cluster state */ @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); - sb.append("idealStateMap:" + _idealStateCache.getIdealStateMap()).append("\n"); - sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n"); - sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); - sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n"); + sb.append("liveInstaceMap:" + _liveInstanceCache.getPropertyMap()).append("\n"); + sb.append("idealStateMap:" + _idealStateCache.getPropertyMap()).append("\n"); + sb.append("stateModelDefMap:" + _stateModelDefinitionCache.getPropertyMap()).append("\n"); + sb.append("instanceConfigMap:" + _instanceConfigCache.getPropertyMap()).append("\n"); + sb.append("resourceConfigMap:" + _resourceConfigCache.getPropertyMap()).append("\n"); sb.append("taskDataCache:" + _taskDataCache).append("\n"); sb.append("messageCache:" + _instanceMessagesCache).append("\n"); sb.append("currentStateCache:" + _currentStateCache).append("\n"); diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java index 11044b7..e348798 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java @@ -46,6 +46,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1); @@ -55,6 +56,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase // refresh again should read nothing cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); // cluster config always get reloaded @@ -65,6 +67,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE); cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); @@ -73,7 +76,8 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE); cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); - Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); } @@ -88,6 +92,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1); Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1); @@ -97,6 +102,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase // refresh again should read nothing cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); @@ -115,6 +121,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); // Add more resources accessor.clearReadCounters(); @@ -128,6 +135,7 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE); cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2); + Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0); // Test WorkflowConfig/JobConfigs TaskDriver driver = new TaskDriver(_manager); diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java index ceda6e5..7dbe6d5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java @@ -41,11 +41,13 @@ public class TestNodeOfflineTimeStamp extends ZkStandAloneCMTestBase { _participants[0].syncStop(); ParticipantHistory history = getInstanceHistory(_participants[0].getInstanceName()); long recordTime = history.getLastOfflineTime(); - Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L); _participants[0].reset(); - Thread.sleep(50); + + // Make it long enough to reduce the potential racing condition that cluster data cache report + // instance offline is actually after instance comes back + Thread.sleep(500); _participants[0].syncStart(); Thread.sleep(50); diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java index 5550386..ce1e657 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java @@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans; import com.google.common.collect.Range; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -34,6 +35,7 @@ import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.controller.stages.TopStateHandoffReportStage; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Resource; import org.codehaus.jackson.annotate.JsonCreator; @@ -176,7 +178,9 @@ public class TestTopStateHandoffMetrics extends BaseStageTest { new MissingStatesDataCacheInject() { @Override public void doInject(ClusterDataCache cache) { - cache.getLiveInstances().remove("localhost_1"); + Map<String, LiveInstance> liMap = new HashMap<>(cache.getLiveInstances()); + liMap.remove("localhost_1"); + cache.setLiveInstances(new ArrayList<>(liMap.values())); } }, 1, 0, expectedDuration, @@ -201,7 +205,9 @@ public class TestTopStateHandoffMetrics extends BaseStageTest { @Override public void doInject(ClusterDataCache cache) { accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance)); - cache.getLiveInstances().remove("localhost_0"); + Map<String, LiveInstance> liMap = new HashMap<>(cache.getLiveInstances()); + liMap.remove("localhost_0"); + cache.setLiveInstances(new ArrayList<>(liMap.values())); cache.getInstanceOfflineTimeMap().put("localhost_0", lastOfflineTime); cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE); }
