This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 405595e0288eea9ef156d0e5d54b7ccc0c768e57
Author: Harry Zhang <[email protected]>
AuthorDate: Mon Nov 19 15:39:04 2018 -0800

    Refactor ClusterDataCache to PropertyCache
---
 .../helix/common/caches/AbstractDataCache.java     |  30 +-
 .../helix/common/caches/CurrentStateCache.java     |   2 +-
 .../helix/common/caches/ExternalViewCache.java     |   4 +-
 .../helix/common/caches/IdealStateCache.java       |   4 +-
 .../apache/helix/common/caches/PropertyCache.java  | 220 ++++++++
 .../common/caches/TargetExternalViewCache.java     |   2 +
 .../helix/controller/stages/ClusterDataCache.java  | 570 +++++++++++++--------
 .../TestClusterDataCacheSelectiveUpdate.java       |  10 +-
 .../paticipant/TestNodeOfflineTimeStamp.java       |   6 +-
 .../mbeans/TestTopStateHandoffMetrics.java         |  10 +-
 10 files changed, 621 insertions(+), 237 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index a4808b3..ae1dc37 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -20,20 +20,23 @@ package org.apache.helix.common.caches;
  */
 
 import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractDataCache {
+public abstract class AbstractDataCache<T extends HelixProperty> {
   private static Logger LOG = 
LoggerFactory.getLogger(AbstractDataCache.class.getName());
-  private String _eventId = "NO_ID";
+  public static final String UNKNOWN_EVENT_ID = "NO_ID";
+  private static final String UNKNOWN_PIPELINE = "UNKNOWN_PIPELINE";
+  private String _eventId = UNKNOWN_EVENT_ID;
+  private String _pipelineName = UNKNOWN_PIPELINE;
 
   public String getEventId() {
     return _eventId;
@@ -43,6 +46,14 @@ public abstract class AbstractDataCache {
     _eventId = eventId;
   }
 
+  public String getPipelineName() {
+    return _pipelineName;
+  }
+
+  public void setPipelineName(String pipelineName) {
+    _pipelineName = pipelineName;
+  }
+
   /**
    * Selectively fetch Helix Properties from ZK by comparing the version of 
local cached one with the one on ZK.
    * If version on ZK is newer, fetch it from zk and update local cache.
@@ -50,10 +61,9 @@ public abstract class AbstractDataCache {
    * @param reloadKeys keys needs to be reload
    * @param cachedKeys keys already exists in the cache
    * @param cachedPropertyMap cached map of propertykey -> property object
-   * @param <T> the type of metadata
    * @return updated properties map
    */
-  protected <T extends HelixProperty> Map<PropertyKey, T> refreshProperties(
+  protected Map<PropertyKey, T> refreshProperties(
       HelixDataAccessor accessor, List<PropertyKey> reloadKeys, 
List<PropertyKey> cachedKeys,
       Map<PropertyKey, T> cachedPropertyMap) {
     // All new entries from zk not cached locally yet should be read from ZK.
@@ -77,8 +87,6 @@ public abstract class AbstractDataCache {
       }
     }
 
-
-
     List<T> reloadedProperty = accessor.getProperty(reloadKeys, true);
     Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
     for (T property : reloadedProperty) {
@@ -90,7 +98,7 @@ public abstract class AbstractDataCache {
       }
     }
 
-    LOG.info(reloadKeys.size() + " properties refreshed from zk.");
+    LogUtil.logInfo(LOG, getEventId(), String.format("%s properties refreshed 
from ZK.", reloadKeys.size()));
     if (LOG.isDebugEnabled()) {
       LOG.debug("refreshed keys: " + reloadKeys);
     }
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 40ba036..aef40c3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -39,7 +39,7 @@ import java.util.Set;
 /**
  * Cache to hold all CurrentStates of a cluster.
  */
-public class CurrentStateCache extends AbstractDataCache {
+public class CurrentStateCache extends AbstractDataCache<CurrentState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(CurrentStateCache.class.getName());
 
   private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
index 94c2b16..1b2ea2b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
@@ -38,8 +38,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Cache to hold all ExternalViews of a cluster.
+ * Deprecated - use {@link PropertyCache<ExternalView>} instead
  */
-public class ExternalViewCache extends AbstractDataCache {
+@Deprecated
+public class ExternalViewCache extends AbstractDataCache<ExternalView> {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExternalViewCache.class.getName());
 
   protected Map<String, ExternalView> _externalViewMap;
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
index 99de0bc..571afdc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
@@ -37,8 +37,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Cache to hold all IdealStates of a cluster.
+ * Deprecated - use {@link PropertyCache<IdealState>} instead
  */
-public class IdealStateCache extends AbstractDataCache {
+@Deprecated
+public class IdealStateCache extends AbstractDataCache<IdealState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(IdealStateCache.class.getName());
 
   private Map<String, IdealState> _idealStateMap;
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
new file mode 100644
index 0000000..1d24fad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
@@ -0,0 +1,220 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.LogUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A general cache for HelixProperty that supports LIST, GET, SET, DELETE 
methods of Helix property.
+ * All operation is in memory and is not persisted into the data store, but it 
provides a method to
+ * refresh cache content from data store using the given data accessor.
+ *
+ * Currently this class only supports helix properties that stores under same 
root, i.e.
+ * IdealState, InstanceConfig, ExternalView, LiveInstance, ResourceConfig, etc.
+ *
+ * WARNING: This class is not thread safe - caller should refresh and utilize 
the cache in a
+ * thread-safe manner
+ * @param <T>
+ */
+public class PropertyCache<T extends HelixProperty> extends 
AbstractDataCache<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PropertyCache.class);
+
+  /**
+   * Interface to abstract retrieval of a type of HelixProperty and its 
instances
+   * @param <O> type of HelixProperty
+   */
+  public interface PropertyCacheKeyFuncs<O extends HelixProperty> {
+    /**
+     * Get PropertyKey for the root of this type of object, used for LIST all 
objects
+     * @return property key to object root
+     */
+    PropertyKey getRootKey(HelixDataAccessor accessor);
+
+    /**
+     * Get PropertyKey for a single object of this type, used for GET single 
instance of the type
+     * @param objName object name
+     * @return property key to the object instance
+     */
+    PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName);
+
+    /**
+     * Get the string to identify the object when we actually use them. It's 
not necessarily the
+     * "id" field of HelixProperty, but could have more semantic meanings of 
that object type
+     * @param obj object instance
+     * @return object identifier
+     */
+    String getObjName(O obj);
+  }
+
+  // Used for serving user operations
+  private Map<String, T> _objMap;
+
+  // Used for caching data from object store - this makes it possible to have 
async
+  // data refresh from object store
+  private Map<String, T> _objCache;
+
+  private final String _clusterName;
+  private final String _propertyDescription;
+  private final boolean _useSelectiveUpdate;
+  private final PropertyCacheKeyFuncs<T> _keyFuncs;
+
+  public PropertyCache(String clusterName, String propertyDescription, 
PropertyCacheKeyFuncs<T> keyFuncs,
+      boolean useSelectiveUpdate) {
+    _clusterName = clusterName;
+    _propertyDescription = propertyDescription;
+    _keyFuncs = keyFuncs;
+    _objMap = new HashMap<>();
+    _objCache = new HashMap<>();
+    _useSelectiveUpdate = useSelectiveUpdate;
+  }
+
+  protected class SelectivePropertyRefreshInputs {
+    private final List<PropertyKey> reloadKeys;
+    private final List<PropertyKey> cachedKeys;
+    private final Map<PropertyKey, T> cachedPropertyMap;
+
+    SelectivePropertyRefreshInputs(List<PropertyKey> keysToReload,
+        List<PropertyKey> currentlyCachedKeys, Map<PropertyKey, T> 
currentCache) {
+      reloadKeys = keysToReload;
+      cachedKeys = currentlyCachedKeys;
+      cachedPropertyMap = currentCache;
+    }
+
+    List<PropertyKey> getCachedKeys() {
+      return cachedKeys;
+    }
+
+    List<PropertyKey> getReloadKeys() {
+      return reloadKeys;
+    }
+
+    Map<PropertyKey, T> getCachedPropertyMap() {
+      return cachedPropertyMap;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private SelectivePropertyRefreshInputs genSelectiveUpdateInput(
+      HelixDataAccessor accessor, Map<String, T> currentCache, 
PropertyCache.PropertyCacheKeyFuncs<T> propertyKeyFuncs) {
+    // Generate keys for all current live instances
+    Set<PropertyKey> curObjKeys = new HashSet<>();
+    for (String liveInstanceName : 
accessor.getChildNames(propertyKeyFuncs.getRootKey(accessor))) {
+      curObjKeys.add(propertyKeyFuncs.getObjPropertyKey(accessor, 
liveInstanceName));
+    }
+
+    // Generate cached objects
+    Set<PropertyKey> cachedKeys = new HashSet<>();
+    Map<PropertyKey, T> cachedObjs = new HashMap<>();
+    for (String objName : currentCache.keySet()) {
+      PropertyKey objKey = propertyKeyFuncs.getObjPropertyKey(accessor, 
objName);
+      cachedKeys.add(objKey);
+      cachedObjs.put(objKey, currentCache.get(objName));
+    }
+    cachedKeys.retainAll(curObjKeys);
+
+    // Generate keys to reload
+    Set<PropertyKey> reloadKeys = new HashSet<>(curObjKeys);
+    reloadKeys.removeAll(cachedKeys);
+
+    return new SelectivePropertyRefreshInputs(new ArrayList<>(reloadKeys),
+        new ArrayList<>(cachedKeys), cachedObjs);
+  }
+
+  /**
+   * Refresh the cache with the given data accessor
+   * @param accessor helix data accessor provided by caller
+   */
+  public void refresh(final HelixDataAccessor accessor) {
+    long start = System.currentTimeMillis();
+    if (_useSelectiveUpdate) {
+      doRefreshWithSelectiveUpdate(accessor);
+    } else {
+      doSimpleCacheRefresh(accessor);
+    }
+    LogUtil.logInfo(LOG, getEventId(), String.format(
+        "Refreshed %s property %s from cluster %s took %s ms for %s pipeline. 
Selective: %s",
+        _objMap.size(), _propertyDescription, _clusterName,
+        System.currentTimeMillis() - start, getPipelineName(), 
_useSelectiveUpdate));
+  }
+
+  private void doSimpleCacheRefresh(final HelixDataAccessor accessor) {
+    _objCache = accessor.getChildValuesMap(_keyFuncs.getRootKey(accessor), 
true);
+    _objMap = new HashMap<>(_objCache);
+  }
+
+  private void doRefreshWithSelectiveUpdate(final HelixDataAccessor accessor) {
+    SelectivePropertyRefreshInputs input =
+        genSelectiveUpdateInput(accessor, _objCache, _keyFuncs);
+    Map<PropertyKey, T> updatedData =
+        refreshProperties(accessor, input.getReloadKeys(), 
input.getCachedKeys(),
+            input.getCachedPropertyMap());
+    _objCache = propertyKeyMapToStringMap(updatedData, _keyFuncs);
+
+    // need to separate keys so we can potentially update cache map 
asynchronously while
+    // keeping snapshot unchanged
+    _objMap = new HashMap<>(_objCache);
+  }
+
+  private Map<String, T> propertyKeyMapToStringMap(Map<PropertyKey, T> 
propertyKeyMap,
+      PropertyCache.PropertyCacheKeyFuncs<T> objNameFunc) {
+    Map<String, T> stringMap = new HashMap<>();
+    for (T obj : propertyKeyMap.values()) {
+      stringMap.put(objNameFunc.getObjName(obj), obj);
+    }
+    return stringMap;
+  }
+
+  public Map<String, T> getPropertyMap() {
+    return Collections.unmodifiableMap(_objMap);
+  }
+
+  public T getPropertyByName(String name) {
+    if (name == null) {
+      return null;
+    }
+    return _objMap.get(name);
+  }
+
+  public void setPropertyMap(Map<String, T> objMap) {
+    // make a copy in case objMap is modified by the caller later on
+    // not updating the cache as cache is for data from data store
+    _objMap = new HashMap<>(objMap);
+  }
+
+  public void setProperty(T obj) {
+    _objMap.put(_keyFuncs.getObjName(obj), obj);
+  }
+
+  public void deletePropertyByName(String name) {
+    _objMap.remove(name);
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java
index fdff9b7..eddcdfb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/TargetExternalViewCache.java
@@ -22,7 +22,9 @@ import org.apache.helix.PropertyType;
 
 /**
  * Cache to hold all TargetExternalViews of a cluster.
+ * Deprecated - use {@link PropertyCache<org.apache.helix.model.ExternalView>} 
instead
  */
+@Deprecated
 public class TargetExternalViewCache extends ExternalViewCache {
   public TargetExternalViewCache(String clusterName) {
     super(clusterName, PropertyType.TARGETEXTERNALVIEW);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index c240e23..f8fedcb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,26 +19,26 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import com.google.common.collect.Maps;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
-import org.apache.helix.common.caches.IdealStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
+import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.model.ClusterConfig;
@@ -67,38 +67,50 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.helix.HelixConstants.ChangeType;
+import static org.apache.helix.HelixConstants.*;
 
 /**
  * Reads the data from the cluster using data accessor. This output 
ClusterData which
  * provides useful methods to search/lookup properties
  */
-public class ClusterDataCache extends AbstractDataCache {
+public class ClusterDataCache {
   private static final Logger LOG = 
LoggerFactory.getLogger(ClusterDataCache.class.getName());
+  private static final List<ChangeType> _noFullRefreshProperty =
+      Arrays.asList(ChangeType.EXTERNAL_VIEW, ChangeType.TARGET_EXTERNAL_VIEW);
 
+  private final String _clusterName;
+  private String _eventId = AbstractDataCache.UNKNOWN_EVENT_ID;
   private ClusterConfig _clusterConfig;
-  private Map<String, LiveInstance> _liveInstanceMap;
-  private Map<String, LiveInstance> _liveInstanceCacheMap;
-  private Map<String, StateModelDefinition> _stateModelDefMap;
-  private Map<String, InstanceConfig> _instanceConfigMap;
-  private Map<String, InstanceConfig> _instanceConfigCacheMap;
+  private boolean _updateInstanceOfflineTime = true;
+  private boolean _isTaskCache;
+  private boolean _isMaintenanceModeEnabled;
+  private ExecutorService _asyncTasksThreadPool;
+
+  // A map recording what data has changed
+  private Map<ChangeType, Boolean> _propertyDataChangedMap;
+
+  // Property caches
+  private final PropertyCache<ExternalView> _externalViewCache;
+  private final PropertyCache<ExternalView> _targetExternalViewCache;
+  private final PropertyCache<ResourceConfig> _resourceConfigCache;
+  private final PropertyCache<InstanceConfig> _instanceConfigCache;
+  private final PropertyCache<LiveInstance> _liveInstanceCache;
+  private final PropertyCache<IdealState> _idealStateCache;
+  private final PropertyCache<ClusterConstraints> _clusterConstraintsCache;
+  private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
+
+  // Special caches
+  private CurrentStateCache _currentStateCache;
+  private TaskDataCache _taskDataCache;
+  private InstanceMessagesCache _instanceMessagesCache;
+
+  // Other miscellaneous caches
   private Map<String, Long> _instanceOfflineTimeMap;
-  private Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
-  private Map<String, ResourceConfig> _resourceConfigCacheMap;
-  private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap 
= new HashMap<>();
   private Map<String, Map<String, String>> _lastTopStateLocationMap = new 
HashMap<>();
-  private Map<String, ExternalView> _targetExternalViewMap;
-  private Map<String, ExternalView> _externalViewMap;
   private Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
-  private String _eventId = "NO_ID";
-
-  private IdealStateCache _idealStateCache;
-  private CurrentStateCache _currentStateCache;
-  private TaskDataCache _taskDataCache;
-  private InstanceMessagesCache _instanceMessagesCache;
 
   // maintain a cache of bestPossible assignment across pipeline runs
   // TODO: this is only for customRebalancer, remove it and merge it with 
_idealMappingCache.
@@ -107,18 +119,8 @@ public class ClusterDataCache extends AbstractDataCache {
   // maintain a cache of idealmapping (preference list) for full-auto resource 
across pipeline runs
   private Map<String, ZNRecord> _idealMappingCache = new HashMap<>();
 
-  private Map<ChangeType, Boolean> _propertyDataChangedMap;
-
   private Map<String, Integer> _participantActiveTaskCount = new HashMap<>();
 
-  private ExecutorService _asyncTasksThreadPool;
-
-  boolean _updateInstanceOfflineTime = true;
-  boolean _isTaskCache;
-  boolean _isMaintenanceModeEnabled;
-
-  private String _clusterName;
-
   // For detecting liveinstance and target resource partition state change in 
task assignment
   // Used in AbstractTaskDispatcher
   private boolean _existsLiveInstanceOrCurrentStateChange = false;
@@ -134,122 +136,282 @@ public class ClusterDataCache extends AbstractDataCache 
{
   public ClusterDataCache(String clusterName) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     for (ChangeType type : ChangeType.values()) {
+      // refresh every type when it is initialized
       _propertyDataChangedMap.put(type, true);
     }
     _clusterName = clusterName;
-    _idealStateCache = new IdealStateCache(_clusterName);
+
+    _externalViewCache = new PropertyCache<>(_clusterName, "ExternalView", new 
PropertyCache.PropertyCacheKeyFuncs<ExternalView>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().externalViews();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().externalView(objName);
+      }
+
+      @Override
+      public String getObjName(ExternalView obj) {
+        return obj.getResourceName();
+      }
+    }, true);
+    _targetExternalViewCache = new PropertyCache<>(_clusterName, 
"TargetExternalView", new PropertyCache.PropertyCacheKeyFuncs<ExternalView>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().targetExternalViews();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().targetExternalView(objName);
+      }
+
+      @Override
+      public String getObjName(ExternalView obj) {
+        return obj.getResourceName();
+      }
+    }, true);
+    _resourceConfigCache = new PropertyCache<>(_clusterName, "ResourceConfig", 
new PropertyCache.PropertyCacheKeyFuncs<ResourceConfig>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().resourceConfigs();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().resourceConfig(objName);
+      }
+
+      @Override
+      public String getObjName(ResourceConfig obj) {
+        return obj.getResourceName();
+      }
+    }, true);
+    _liveInstanceCache = new PropertyCache<>(_clusterName, "LiveInstance", new 
PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().liveInstances();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().liveInstance(objName);
+      }
+
+      @Override
+      public String getObjName(LiveInstance obj) {
+        return obj.getInstanceName();
+      }
+    }, true);
+    _instanceConfigCache = new PropertyCache<>(_clusterName, "InstanceConfig", 
new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().instanceConfigs();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().instanceConfig(objName);
+      }
+
+      @Override
+      public String getObjName(InstanceConfig obj) {
+        return obj.getInstanceName();
+      }
+    }, true);
+    _idealStateCache = new PropertyCache<>(_clusterName, "IdealState", new 
PropertyCache.PropertyCacheKeyFuncs<IdealState>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().idealStates();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().idealStates(objName);
+      }
+
+      @Override
+      public String getObjName(IdealState obj) {
+        return obj.getResourceName();
+      }
+    }, true);
+    _clusterConstraintsCache = new PropertyCache<>(_clusterName, 
"ClusterConstraint", new 
PropertyCache.PropertyCacheKeyFuncs<ClusterConstraints>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().constraints();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().constraint(objName);
+      }
+
+      @Override
+      public String getObjName(ClusterConstraints obj) {
+        // We set constraint type to the HelixProperty id
+        return obj.getId();
+      }
+    }, false);
+    _stateModelDefinitionCache = new PropertyCache<>(_clusterName, 
"StateModelDefinition", new 
PropertyCache.PropertyCacheKeyFuncs<StateModelDefinition>() {
+      @Override
+      public PropertyKey getRootKey(HelixDataAccessor accessor) {
+        return accessor.keyBuilder().stateModelDefs();
+      }
+
+      @Override
+      public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String 
objName) {
+        return accessor.keyBuilder().stateModelDef(objName);
+      }
+
+      @Override
+      public String getObjName(StateModelDefinition obj) {
+        return obj.getId();
+      }
+    }, false);
+
     _currentStateCache = new CurrentStateCache(_clusterName);
     _taskDataCache = new TaskDataCache(_clusterName);
     _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
-  /**
-   * This refreshes the cluster data by re-fetching the data from zookeeper in
-   * an efficient way
-   * @param accessor
-   * @return
-   */
-  public synchronized boolean refresh(HelixDataAccessor accessor) {
-    long startTime = System.currentTimeMillis();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    // Reset the LiveInstance/CurrentState change flag
-    _existsLiveInstanceOrCurrentStateChange = false;
+  private void updateCachePipelineNames() {
+    // TODO (harry): remove such update cache name after we have specific 
cache for
+    // different pipelines
+    String pipelineType = _isTaskCache ? "TASK" : "DEFAULT";
+    _externalViewCache.setPipelineName(pipelineType);
+    _targetExternalViewCache.setPipelineName(pipelineType);
+    _resourceConfigCache.setPipelineName(pipelineType);
+    _liveInstanceCache.setPipelineName(pipelineType);
+    _instanceConfigCache.setPipelineName(pipelineType);
+    _idealStateCache.setPipelineName(pipelineType);
+    _clusterConstraintsCache.setPipelineName(pipelineType);
+    _stateModelDefinitionCache.setPipelineName(pipelineType);
+  }
 
+  private void refreshIdealState(final HelixDataAccessor accessor) {
     if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) {
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, false);
       clearCachedResourceAssignments();
       _idealStateCache.refresh(accessor);
-      LogUtil.logInfo(LOG, _eventId,
-          "Refresh IdealStates for cluster " + _clusterName + ", took "
-              + (System.currentTimeMillis() - startTime) + " ms for "
-              + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+    } else {
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("No ideal state change for %s cluster, %s pipeline", 
_clusterName,
+              _idealStateCache.getPipelineName()));
     }
+  }
 
+  private void refreshLiveInstances(final HelixDataAccessor accessor) {
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
-      startTime = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, false);
       clearCachedResourceAssignments();
-      _liveInstanceCacheMap = 
accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
+      _liveInstanceCache.refresh(accessor);
       _updateInstanceOfflineTime = true;
-      LogUtil.logInfo(LOG, _eventId,
-          "Refresh LiveInstances for cluster " + _clusterName + ", took "
-              + (System.currentTimeMillis() - startTime) + " ms for "
-              + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+    } else {
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("No live instance change for %s cluster, %s pipeline", 
_clusterName,
+              _liveInstanceCache.getPipelineName()));
     }
+  }
 
+  private void refreshInstanceConfigs(final HelixDataAccessor accessor) {
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _existsInstanceChange = true;
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
-      _instanceConfigCacheMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
-      LogUtil.logInfo(LOG, _eventId,
-          "Reload InstanceConfig for cluster " + _clusterName + " : " + 
_instanceConfigCacheMap
-              .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + 
"pipeline");
+      _instanceConfigCache.refresh(accessor);
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: 
%s", _clusterName,
+              _instanceConfigCache.getPipelineName(),
+              _instanceConfigCache.getPropertyMap().keySet()));
+    } else {
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("No instance config change for %s cluster, %s pipeline", 
_clusterName,
+              _liveInstanceCache.getPipelineName()));
     }
+  }
 
+  private void refreshResourceConfig(final HelixDataAccessor accessor) {
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, false);
       clearCachedResourceAssignments();
-
-      _resourceConfigCacheMap = refreshResourceConfigs(accessor);
-      LogUtil.logInfo(LOG, _eventId,
-          "Reload ResourceConfigs for cluster " + _clusterName + " : " + 
_resourceConfigCacheMap
-              .keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + 
"pipeline");
-    }
-
-    // This is for targeted jobs' task assignment. It needs to watch for 
current state changes for
-    // when targeted resources' state transitions complete
-    if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
-      _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false);
+      _resourceConfigCache.refresh(accessor);
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("Reloaded ResourceConfig for cluster %s, %s pipeline. Cnt: 
%s", _clusterName,
+              _resourceConfigCache.getPipelineName(),
+              _resourceConfigCache.getPropertyMap().keySet().size()));
+    } else {
+      LogUtil.logInfo(LOG, getEventId(), String
+          .format("No resource config change for %s cluster, %s pipeline", 
_clusterName,
+              _liveInstanceCache.getPipelineName()));
     }
+  }
 
-    // This is for AssignableInstances. Whenever there is a quota config 
change in ClusterConfig, we
-    // must trigger an update to AssignableInstanceManager
-    if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) {
-      _existsClusterConfigChange = true;
-      _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false);
+  private void refreshExternalViews(final HelixDataAccessor accessor) {
+    // As we are not listening on external view change, external view will be
+    // refreshed once during the cache's first refresh() call, or when full 
refresh is required
+    if (_propertyDataChangedMap.get(ChangeType.EXTERNAL_VIEW)) {
+      _externalViewCache.refresh(accessor);
+      _propertyDataChangedMap.put(ChangeType.EXTERNAL_VIEW, false);
     }
+  }
 
-    _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
-    _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
-    _resourceConfigMap = new HashMap<>(_resourceConfigCacheMap);
+  private void refreshTargetExternalViews(final HelixDataAccessor accessor) {
+    if (_propertyDataChangedMap.get(ChangeType.TARGET_EXTERNAL_VIEW)) {
+      if (_clusterConfig != null && 
_clusterConfig.isTargetExternalViewEnabled()) {
+        _targetExternalViewCache.refresh(accessor);
 
-    if (_updateInstanceOfflineTime) {
-      updateOfflineInstanceHistory(accessor);
+        // Only set the change type back once we get refreshed with data 
accessor for the
+        // first time
+        _propertyDataChangedMap.put(ChangeType.TARGET_EXTERNAL_VIEW, false);
+      }
     }
+  }
 
-    Map<String, StateModelDefinition> stateDefMap =
-        accessor.getChildValuesMap(keyBuilder.stateModelDefs(), true);
-    _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap);
-    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints(), 
true);
-    _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
-
-    if (_isTaskCache) {
-      // Refresh TaskCache
-      _taskDataCache.refresh(accessor, _resourceConfigMap);
+  private void updateMaintenanceInfo(final HelixDataAccessor accessor) {
+    MaintenanceSignal maintenanceSignal = 
accessor.getProperty(accessor.keyBuilder().maintenance());
+    _isMaintenanceModeEnabled = maintenanceSignal != null;
+  }
 
-      // Refresh AssignableInstanceManager
-      AssignableInstanceManager assignableInstanceManager =
-          _taskDataCache.getAssignableInstanceManager();
-      // Build from scratch every time
-      assignableInstanceManager
-          .buildAssignableInstances(_clusterConfig, _taskDataCache, 
_liveInstanceMap,
-              _instanceConfigMap);
-      // TODO: (Hunter) Consider this for optimization after fixing the 
problem of quotas not being
+  /**
+   * This refreshes the cluster data by re-fetching the data from zookeeper in
+   * an efficient way
+   * @param accessor
+   * @return
+   */
+  public synchronized boolean refresh(HelixDataAccessor accessor) {
+    long startTime = System.currentTimeMillis();
 
-      assignableInstanceManager.logQuotaProfileJSON(false);
-    }
+    // Reset the LiveInstance/CurrentState change flag
+    _existsLiveInstanceOrCurrentStateChange = false;
 
-    _instanceMessagesCache.refresh(accessor, _liveInstanceMap);
-    _currentStateCache.refresh(accessor, _liveInstanceMap);
+    // Refresh raw data
+    _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
+    refreshIdealState(accessor);
+    refreshLiveInstances(accessor);
+    refreshInstanceConfigs(accessor);
+    refreshResourceConfig(accessor);
+    _stateModelDefinitionCache.refresh(accessor);
+    _clusterConstraintsCache.refresh(accessor);
+    refreshExternalViews(accessor);
+    refreshTargetExternalViews(accessor);
+    updateMaintenanceInfo(accessor);
+
+    // Refresh derived data
+    _instanceMessagesCache.refresh(accessor, 
_liveInstanceCache.getPropertyMap());
+    _currentStateCache.refresh(accessor, _liveInstanceCache.getPropertyMap());
 
     // current state must be refreshed before refreshing relay messages
     // because we need to use current state to validate all relay messages.
-    _instanceMessagesCache.updateRelayMessages(_liveInstanceMap,
+    
_instanceMessagesCache.updateRelayMessages(_liveInstanceCache.getPropertyMap(),
         _currentStateCache.getCurrentStatesMap());
 
+    if (_updateInstanceOfflineTime) {
+      // TODO: make it async
+      updateOfflineInstanceHistory(accessor);
+    }
+
     if (_clusterConfig != null) {
       _idealStateRuleMap = _clusterConfig.getIdealStateRules();
     } else {
@@ -258,18 +420,38 @@ public class ClusterDataCache extends AbstractDataCache {
           "Cluster config is null for " + (_isTaskCache ? "TASK" : "DEFAULT") 
+ "pipeline");
     }
 
-    MaintenanceSignal maintenanceSignal = 
accessor.getProperty(keyBuilder.maintenance());
-    _isMaintenanceModeEnabled = maintenanceSignal != null;
-
     updateDisabledInstances();
 
-    if (_externalViewMap == null) {
-      _externalViewMap = 
accessor.getChildValuesMap(accessor.keyBuilder().externalViews());
+
+    // TaskFramework related operations
+
+    // This is for targeted jobs' task assignment. It needs to watch for 
current state changes for
+    // when targeted resources' state transitions complete
+    if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) {
+      _existsLiveInstanceOrCurrentStateChange = true;
+      _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false);
+    }
+
+    // This is for AssignableInstances. Whenever there is a quota config 
change in ClusterConfig, we
+    // must trigger an update to AssignableInstanceManager
+    if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) {
+      _existsClusterConfigChange = true;
+      _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false);
     }
 
-    if (_clusterConfig.isTargetExternalViewEnabled() && _targetExternalViewMap 
== null) {
-      _targetExternalViewMap =
-          
accessor.getChildValuesMap(accessor.keyBuilder().targetExternalViews());
+    if (_isTaskCache) {
+      // Refresh TaskCache
+      _taskDataCache.refresh(accessor, _resourceConfigCache.getPropertyMap());
+
+      // Refresh AssignableInstanceManager
+      AssignableInstanceManager assignableInstanceManager =
+          _taskDataCache.getAssignableInstanceManager();
+      // Build from scratch every time
+      assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
_taskDataCache,
+          _liveInstanceCache.getPropertyMap(), 
_instanceConfigCache.getPropertyMap());
+      // TODO: (Hunter) Consider this for optimization after fixing the 
problem of quotas not being
+
+      assignableInstanceManager.logQuotaProfileJSON(false);
     }
 
     long endTime = System.currentTimeMillis();
@@ -278,19 +460,27 @@ public class ClusterDataCache extends AbstractDataCache {
             + (endTime - startTime) + " ms for " + (_isTaskCache ? "TASK" : 
"DEFAULT")
             + "pipeline");
 
+    dumpDebugInfo();
+    return true;
+  }
+
+  private void dumpDebugInfo() {
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, _eventId,
-          "# of StateModelDefinition read from zk: " + 
_stateModelDefMap.size());
-      LogUtil.logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + 
_constraintMap.size());
-      LogUtil.logDebug(LOG, _eventId, "LiveInstances: " + 
_liveInstanceMap.keySet());
-      for (LiveInstance instance : _liveInstanceMap.values()) {
+          "# of StateModelDefinition read from zk: " + 
_stateModelDefinitionCache.getPropertyMap().size());
+      LogUtil.logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + 
_clusterConstraintsCache.getPropertyMap().size());
+      LogUtil.logDebug(LOG, _eventId,
+          "LiveInstances: " + _liveInstanceCache.getPropertyMap().keySet());
+      for (LiveInstance instance : 
_liveInstanceCache.getPropertyMap().values()) {
         LogUtil.logDebug(LOG, _eventId,
             "live instance: " + instance.getInstanceName() + " " + 
instance.getSessionId());
       }
+      LogUtil
+          .logDebug(LOG, _eventId, "IdealStates: " + 
_idealStateCache.getPropertyMap().keySet());
+      LogUtil.logDebug(LOG, _eventId,
+          "ResourceConfigs: " + 
_resourceConfigCache.getPropertyMap().keySet());
       LogUtil.logDebug(LOG, _eventId,
-          "IdealStates: " + _idealStateCache.getIdealStateMap().keySet());
-      LogUtil.logDebug(LOG, _eventId, "ResourceConfigs: " + 
_resourceConfigMap.keySet());
-      LogUtil.logDebug(LOG, _eventId, "InstanceConfigs: " + 
_instanceConfigMap.keySet());
+          "InstanceConfigs: " + 
_instanceConfigCache.getPropertyMap().keySet());
       LogUtil.logDebug(LOG, _eventId, "ClusterConfigs: " + _clusterConfig);
       LogUtil.logDebug(LOG, _eventId, "JobContexts: " + 
_taskDataCache.getContexts().keySet());
     }
@@ -298,15 +488,13 @@ public class ClusterDataCache extends AbstractDataCache {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Cache content: " + toString());
     }
-
-    return true;
   }
 
   private void updateDisabledInstances() {
     // Move the calculating disabled instances to refresh
     _disabledInstanceForPartitionMap.clear();
     _disabledInstanceSet.clear();
-    for (InstanceConfig config : _instanceConfigMap.values()) {
+    for (InstanceConfig config : 
_instanceConfigCache.getPropertyMap().values()) {
       Map<String, List<String>> disabledPartitionMap = 
config.getDisabledPartitionsMap();
       if (!config.getInstanceEnabled()) {
         _disabledInstanceSet.add(config.getInstanceName());
@@ -324,14 +512,15 @@ public class ClusterDataCache extends AbstractDataCache {
         }
       }
     }
-    if (_clusterConfig.getDisabledInstances() != null) {
+    if (_clusterConfig != null && _clusterConfig.getDisabledInstances() != 
null) {
       
_disabledInstanceSet.addAll(_clusterConfig.getDisabledInstances().keySet());
     }
   }
 
   private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
-    List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet());
-    offlineNodes.removeAll(_liveInstanceMap.keySet());
+    List<String> offlineNodes =
+        new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
+    offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet());
     _instanceOfflineTimeMap = new HashMap<>();
 
     for (String instance : offlineNodes) {
@@ -379,11 +568,11 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, IdealState> getIdealStates() {
-    return _idealStateCache.getIdealStateMap();
+    return _idealStateCache.getPropertyMap();
   }
 
   public synchronized void setIdealStates(List<IdealState> idealStates) {
-    _idealStateCache.setIdealStates(idealStates);
+    
_idealStateCache.setPropertyMap(HelixProperty.convertListToMap(idealStates));
   }
 
   public Map<String, Map<String, String>> getIdealStateRules() {
@@ -395,14 +584,14 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, LiveInstance> getLiveInstances() {
-    return _liveInstanceMap;
+    return _liveInstanceCache.getPropertyMap();
   }
 
   /**
    * Return the set of all instances names.
    */
   public Set<String> getAllInstances() {
-    return new HashSet<>(_instanceConfigMap.keySet());
+    return _instanceConfigCache.getPropertyMap().keySet();
   }
 
   /**
@@ -421,7 +610,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Set<String> getEnabledInstances() {
-    Set<String> enabledNodes = getAllInstances();
+    Set<String> enabledNodes = new HashSet<>(getAllInstances());
     enabledNodes.removeAll(getDisabledInstances());
 
     return enabledNodes;
@@ -448,8 +637,8 @@ public class ClusterDataCache extends AbstractDataCache {
    */
   public Set<String> getInstancesWithTag(String instanceTag) {
     Set<String> taggedInstances = new HashSet<>();
-    for (String instance : _instanceConfigMap.keySet()) {
-      InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+    for (String instance : _instanceConfigCache.getPropertyMap().keySet()) {
+      InstanceConfig instanceConfig = 
_instanceConfigCache.getPropertyByName(instance);
       if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
         taggedInstances.add(instance);
       }
@@ -459,11 +648,7 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
-    for (LiveInstance liveInstance : liveInstances) {
-      liveInstanceMap.put(liveInstance.getId(), liveInstance);
-    }
-    _liveInstanceCacheMap = liveInstanceMap;
+    
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
     _updateInstanceOfflineTime = true;
 
     // TODO: Move this when listener for LiveInstance is being refactored
@@ -510,10 +695,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public StateModelDefinition getStateModelDef(String stateModelDefRef) {
-    if (stateModelDefRef == null) {
-      return null;
-    }
-    return _stateModelDefMap.get(stateModelDefRef);
+    return _stateModelDefinitionCache.getPropertyByName(stateModelDefRef);
   }
 
   /**
@@ -521,7 +703,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return state model definition map
    */
   public Map<String, StateModelDefinition> getStateModelDefMap() {
-    return _stateModelDefMap;
+    return _stateModelDefinitionCache.getPropertyMap();
   }
 
   /**
@@ -530,7 +712,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public IdealState getIdealState(String resourceName) {
-    return _idealStateCache.getIdealStateMap().get(resourceName);
+    return _idealStateCache.getPropertyByName(resourceName);
   }
 
   /**
@@ -538,7 +720,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, InstanceConfig> getInstanceConfigMap() {
-    return _instanceConfigMap;
+    return _instanceConfigCache.getPropertyMap();
   }
 
   /**
@@ -546,7 +728,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @param instanceConfigMap
    */
   public void setInstanceConfigMap(Map<String, InstanceConfig> 
instanceConfigMap) {
-    _instanceConfigMap = instanceConfigMap;
+    _instanceConfigCache.setPropertyMap(instanceConfigMap);
   }
 
   /**
@@ -554,7 +736,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, ResourceConfig> getResourceConfigMap() {
-    return _resourceConfigMap;
+    return _resourceConfigCache.getPropertyMap();
   }
 
   /**
@@ -571,12 +753,8 @@ public class ClusterDataCache extends AbstractDataCache {
     notifyDataChange(changeType);
   }
 
-  /**
-   * Returns the instance config map
-   * @return
-   */
   public ResourceConfig getResourceConfig(String resource) {
-    return _resourceConfigMap.get(resource);
+    return _resourceConfigCache.getPropertyByName(resource);
   }
 
   /**
@@ -613,14 +791,6 @@ public class ClusterDataCache extends AbstractDataCache {
     return _taskDataCache.getWorkflowConfig(resource);
   }
 
-  public synchronized void setInstanceConfigs(List<InstanceConfig> 
instanceConfigs) {
-    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig instanceConfig : instanceConfigs) {
-      instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
-    }
-    _instanceConfigCacheMap = instanceConfigMap;
-  }
-
   /**
    * Some partitions might be disabled on specific nodes.
    * This method allows one to fetch the set of nodes where a given partition 
is disabled
@@ -653,14 +823,14 @@ public class ClusterDataCache extends AbstractDataCache {
    */
   public int getReplicas(String resourceName) {
     int replicas = -1;
-    Map<String, IdealState> idealStateMap = 
_idealStateCache.getIdealStateMap();
+    Map<String, IdealState> idealStateMap = _idealStateCache.getPropertyMap();
 
     if (idealStateMap.containsKey(resourceName)) {
       String replicasStr = idealStateMap.get(resourceName).getReplicas();
 
       if (replicasStr != null) {
         if 
(replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString()))
 {
-          replicas = _liveInstanceMap.size();
+          replicas = _liveInstanceCache.getPropertyMap().size();
         } else {
           try {
             replicas = Integer.parseInt(replicasStr);
@@ -683,10 +853,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public ClusterConstraints getConstraint(ConstraintType type) {
-    if (_constraintMap != null) {
-      return _constraintMap.get(type.toString());
-    }
-    return null;
+    return _clusterConstraintsCache.getPropertyByName(type.name());
   }
 
   public Map<String, Map<String, MissingTopStateRecord>> 
getMissingTopStateMap() {
@@ -793,11 +960,11 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   public ExternalView getTargetExternalView(String resourceName) {
-    return _targetExternalViewMap == null ? null : 
_targetExternalViewMap.get(resourceName);
+    return _targetExternalViewCache.getPropertyByName(resourceName);
   }
 
   public void updateTargetExternalView(String resourceName, ExternalView 
targetExternalView) {
-    _targetExternalViewMap.put(resourceName, targetExternalView);
+    _targetExternalViewCache.setProperty(targetExternalView);
   }
 
   /**
@@ -805,10 +972,7 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, ExternalView> getExternalViews() {
-    if (_externalViewMap == null) {
-      return Collections.emptyMap();
-    }
-    return Collections.unmodifiableMap(_externalViewMap);
+    return _externalViewCache.getPropertyMap();
   }
 
   /**
@@ -816,11 +980,8 @@ public class ClusterDataCache extends AbstractDataCache {
    * @param externalViews
    */
   public void updateExternalViews(List<ExternalView> externalViews) {
-    if (_externalViewMap == null) {
-      _externalViewMap = new HashMap<>();
-    }
-    for (ExternalView externalView : externalViews) {
-      _externalViewMap.put(externalView.getResourceName(), externalView);
+    for (ExternalView ev : externalViews) {
+      _externalViewCache.setProperty(ev);
     }
   }
 
@@ -830,8 +991,8 @@ public class ClusterDataCache extends AbstractDataCache {
    */
 
   public void removeExternalViews(List<String> resourceNames) {
-    for (String externalView : resourceNames) {
-      _externalViewMap.remove(externalView);
+    for (String resourceName : resourceNames) {
+      _externalViewCache.deletePropertyByName(resourceName);
     }
   }
 
@@ -840,7 +1001,10 @@ public class ClusterDataCache extends AbstractDataCache {
    */
   public synchronized void requireFullRefresh() {
     for (ChangeType type : ChangeType.values()) {
-      _propertyDataChangedMap.put(type, true);
+      // We only refresh EV and TEV the very first time the cluster data cache 
is initialized
+      if (!_noFullRefreshProperty.contains(type)) {
+        _propertyDataChangedMap.put(type, true);
+      }
     }
   }
 
@@ -931,6 +1095,7 @@ public class ClusterDataCache extends AbstractDataCache {
    */
   public void setTaskCache(boolean taskCache) {
     _isTaskCache = taskCache;
+    updateCachePipelineNames();
   }
 
   /**
@@ -959,6 +1124,11 @@ public class ClusterDataCache extends AbstractDataCache {
     _idealStateCache.setEventId(eventId);
     _currentStateCache.setEventId(eventId);
     _taskDataCache.setEventId(eventId);
+    _liveInstanceCache.setEventId(eventId);
+    _instanceConfigCache.setEventId(eventId);
+    _resourceConfigCache.setEventId(eventId);
+    _stateModelDefinitionCache.setEventId(eventId);
+    _clusterConstraintsCache.setEventId(eventId);
   }
 
   /**
@@ -970,53 +1140,17 @@ public class ClusterDataCache extends AbstractDataCache {
     return _existsLiveInstanceOrCurrentStateChange;
   }
 
-  private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor 
accessor) {
-    Map<String, ResourceConfig> refreshedResourceConfigs = Maps.newHashMap();
-
-    long startTime = System.currentTimeMillis();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Set<PropertyKey> currentResourceConfigKeys = new HashSet<>();
-    for (String resourceConfig : 
accessor.getChildNames(keyBuilder.resourceConfigs())) {
-      currentResourceConfigKeys.add(keyBuilder.resourceConfig(resourceConfig));
-    }
-
-    Set<PropertyKey> cachedKeys = new HashSet<>();
-    Map<PropertyKey, ResourceConfig> cachedResourceConfigMap = 
Maps.newHashMap();
-
-    for (String resourceConfig : _resourceConfigMap.keySet()) {
-      cachedKeys.add(keyBuilder.resourceConfig(resourceConfig));
-      cachedResourceConfigMap.put(keyBuilder.resourceConfig(resourceConfig),
-          _resourceConfigMap.get(resourceConfig));
-    }
-    cachedKeys.retainAll(currentResourceConfigKeys);
-
-    Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys);
-    reloadKeys.removeAll(cachedKeys);
-
-    Map<PropertyKey, ResourceConfig> updatedMap = refreshProperties(accessor,
-        new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), 
cachedResourceConfigMap);
-    for (ResourceConfig resourceConfig : updatedMap.values()) {
-      refreshedResourceConfigs.put(resourceConfig.getResourceName(), 
resourceConfig);
-    }
-
-    long endTime = System.currentTimeMillis();
-    LogUtil.logInfo(LOG, getEventId(),
-        "Refresh " + refreshedResourceConfigs.size() + " resource configs for 
cluster "
-            + _clusterName + ", took " + (endTime - startTime) + " ms");
-    return refreshedResourceConfigs;
-  }
-
   /**
    * toString method to print the entire cluster state
    */
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
-    sb.append("idealStateMap:" + 
_idealStateCache.getIdealStateMap()).append("\n");
-    sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
-    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
-    sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n");
+    sb.append("liveInstaceMap:" + 
_liveInstanceCache.getPropertyMap()).append("\n");
+    sb.append("idealStateMap:" + 
_idealStateCache.getPropertyMap()).append("\n");
+    sb.append("stateModelDefMap:" + 
_stateModelDefinitionCache.getPropertyMap()).append("\n");
+    sb.append("instanceConfigMap:" + 
_instanceConfigCache.getPropertyMap()).append("\n");
+    sb.append("resourceConfigMap:" + 
_resourceConfigCache.getPropertyMap()).append("\n");
     sb.append("taskDataCache:" + _taskDataCache).append("\n");
     sb.append("messageCache:" + _instanceMessagesCache).append("\n");
     sb.append("currentStateCache:" + _currentStateCache).append("\n");
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index 11044b7..e348798 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -46,6 +46,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.refresh(accessor);
 
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 
NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 
NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 
1);
@@ -55,6 +56,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     // refresh again should read nothing
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0);
     // cluster config always get reloaded
@@ -65,6 +67,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1);
@@ -73,7 +76,8 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0);
-    Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 
NODE_NR);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1);
   }
@@ -88,6 +92,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.refresh(accessor);
 
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 
NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 
NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 
1);
@@ -97,6 +102,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     // refresh again should read nothing
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1);
@@ -115,6 +121,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 
NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
 
     // Add more resources
     accessor.clearReadCounters();
@@ -128,6 +135,7 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
 
     // Test WorkflowConfig/JobConfigs
     TaskDriver driver = new TaskDriver(_manager);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
index ceda6e5..7dbe6d5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
@@ -41,11 +41,13 @@ public class TestNodeOfflineTimeStamp extends 
ZkStandAloneCMTestBase {
     _participants[0].syncStop();
     ParticipantHistory history = 
getInstanceHistory(_participants[0].getInstanceName());
     long recordTime = history.getLastOfflineTime();
-
     Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L);
 
     _participants[0].reset();
-    Thread.sleep(50);
+
+    // Make it long enough to reduce the potential racing condition that 
cluster data cache report
+    // instance offline is actually after instance comes back
+    Thread.sleep(500);
     _participants[0].syncStart();
 
     Thread.sleep(50);
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 5550386..ce1e657 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.google.common.collect.Range;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -34,6 +35,7 @@ import 
org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.TopStateHandoffReportStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Resource;
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -176,7 +178,9 @@ public class TestTopStateHandoffMetrics extends 
BaseStageTest {
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
-            cache.getLiveInstances().remove("localhost_1");
+            Map<String, LiveInstance> liMap = new 
HashMap<>(cache.getLiveInstances());
+            liMap.remove("localhost_1");
+            cache.setLiveInstances(new ArrayList<>(liMap.values()));
           }
         }, 1, 0,
         expectedDuration,
@@ -201,7 +205,9 @@ public class TestTopStateHandoffMetrics extends 
BaseStageTest {
           @Override
           public void doInject(ClusterDataCache cache) {
             
accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance));
-            cache.getLiveInstances().remove("localhost_0");
+            Map<String, LiveInstance> liMap = new 
HashMap<>(cache.getLiveInstances());
+            liMap.remove("localhost_0");
+            cache.setLiveInstances(new ArrayList<>(liMap.values()));
             cache.getInstanceOfflineTimeMap().put("localhost_0", 
lastOfflineTime);
             cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
           }

Reply via email to