This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 71becb3  Fix missed callbacks in CurrentStates based 
RoutingTableProvider. (#458)
71becb3 is described below

commit 71becb3de497413ddce2080541f04c1b93028a85
Author: pkuwm <[email protected]>
AuthorDate: Fri Sep 13 17:08:17 2019 -0700

    Fix missed callbacks in CurrentStates based RoutingTableProvider. (#458)
    
    1. Update BasicClusterDataCache to do refresh with selective update. Only 
when a change happens, we do the cache refresh only for that change type.
    2. Improve RoutingTableProvider.queueEvent() and 
RoutingTableProvider.handleEvent(). Return instanceConfigs snapshot to callback 
immediately, instead of waiting for currentStates completion.
---
 .../helix/common/caches/BasicClusterDataCache.java | 116 +++++++++++++++------
 .../helix/spectator/RoutingTableProvider.java      |  17 +--
 2 files changed, 98 insertions(+), 35 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index d3a3d20..b01bb0c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -19,13 +19,12 @@ package org.apache.helix.common.caches;
  * under the License.
  */
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -35,11 +34,16 @@ import org.slf4j.LoggerFactory;
 /**
  * Cache the basic cluster data, including LiveInstances, InstanceConfigs and 
ExternalViews.
  */
-public class BasicClusterDataCache {
+public class BasicClusterDataCache implements ControlContextProvider {
   private static Logger LOG = 
LoggerFactory.getLogger(BasicClusterDataCache.class.getName());
 
-  protected Map<String, LiveInstance> _liveInstanceMap;
-  protected Map<String, InstanceConfig> _instanceConfigMap;
+  private static final String INSTANCE_CONFIG = "InstanceConfig";
+  private static final String LIVE_INSTANCE = "LiveInstance";
+
+  private String _clusterEventId;
+
+  protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
+  protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
   protected ExternalViewCache _externalViewCache;
 
   protected String _clusterName;
@@ -48,15 +52,50 @@ public class BasicClusterDataCache {
 
   public BasicClusterDataCache(String clusterName) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
-    _liveInstanceMap = new HashMap<>();
-    _instanceConfigMap = new HashMap<>();
     _externalViewCache = new ExternalViewCache(clusterName);
     _clusterName = clusterName;
-    requireFullRefresh();
+    _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
+
+    _liveInstancePropertyCache = new PropertyCache<>(this, LIVE_INSTANCE,
+        new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
+          @Override
+          public PropertyKey getRootKey(HelixDataAccessor accessor) {
+            return accessor.keyBuilder().liveInstances();
+          }
+
+          @Override
+          public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, 
String objName) {
+            return accessor.keyBuilder().liveInstance(objName);
+          }
+
+          @Override
+          public String getObjName(LiveInstance obj) {
+            return obj.getInstanceName();
+          }
+        }, true);
+
+    _instanceConfigPropertyCache = new PropertyCache<>(this, INSTANCE_CONFIG,
+        new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
+          @Override
+          public PropertyKey getRootKey(HelixDataAccessor accessor) {
+            return accessor.keyBuilder().instanceConfigs();
+          }
+
+          @Override
+          public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, 
String objName) {
+            return accessor.keyBuilder().instanceConfig(objName);
+          }
+
+          @Override
+          public String getObjName(InstanceConfig obj) {
+            return obj.getInstanceName();
+          }
+        }, true);
   }
 
   /**
-   * This refreshes the cluster data by re-fetching the data from zookeeper in 
an efficient way
+   * This refreshes the cluster data by re-fetching the data from zookeeper in 
an efficient way.
+   * If we want to support multi-threading in the future, this method needs to 
be synchronized.
    *
    * @param accessor
    *
@@ -65,7 +104,6 @@ public class BasicClusterDataCache {
   public void refresh(HelixDataAccessor accessor) {
     LOG.info("START: BasicClusterDataCache.refresh() for cluster " + 
_clusterName);
     long startTime = System.currentTimeMillis();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, 
Boolean.valueOf(false));
@@ -75,18 +113,18 @@ public class BasicClusterDataCache {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, 
Boolean.valueOf(false));
-      _liveInstanceMap = 
accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
-      LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes 
" + (
-          System.currentTimeMillis() - start) + " ms");
+      _liveInstancePropertyCache.refresh(accessor);
+      LOG.info("Reload LiveInstances: " + 
_liveInstancePropertyCache.getPropertyMap().keySet()
+          + ". Takes " + (System.currentTimeMillis() - start) + " ms");
     }
 
     if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
       long start = System.currentTimeMillis();
-      _propertyDataChangedMap
-          .put(HelixConstants.ChangeType.INSTANCE_CONFIG, 
Boolean.valueOf(false));
-      _instanceConfigMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
-      LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ". 
Takes " + (
-          System.currentTimeMillis() - start) + " ms");
+      _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG,
+          Boolean.valueOf(false));
+      _instanceConfigPropertyCache.refresh(accessor);
+      LOG.info("Reload InstanceConfig: " + 
_instanceConfigPropertyCache.getPropertyMap().keySet()
+          + ". Takes " + (System.currentTimeMillis() - start) + " ms");
     }
 
     long endTime = System.currentTimeMillis();
@@ -95,9 +133,9 @@ public class BasicClusterDataCache {
             - startTime) + " ms");
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("LiveInstances: " + _liveInstanceMap);
-      LOG.debug("ExternalViews: " + 
_externalViewCache.getExternalViewMap().keySet());
-      LOG.debug("InstanceConfigs: " + _instanceConfigMap);
+      LOG.debug("LiveInstances: {}", 
_liveInstancePropertyCache.getPropertyMap());
+      LOG.debug("ExternalViews: {}", 
_externalViewCache.getExternalViewMap().keySet());
+      LOG.debug("InstanceConfigs: {}", 
_instanceConfigPropertyCache.getPropertyMap());
     }
   }
 
@@ -116,7 +154,7 @@ public class BasicClusterDataCache {
    * @return
    */
   public Map<String, LiveInstance> getLiveInstances() {
-    return Collections.unmodifiableMap(_liveInstanceMap);
+    return _liveInstancePropertyCache.getPropertyMap();
   }
 
   /**
@@ -125,7 +163,7 @@ public class BasicClusterDataCache {
    * @return
    */
   public Map<String, InstanceConfig> getInstanceConfigMap() {
-    return Collections.unmodifiableMap(_instanceConfigMap);
+    return _instanceConfigPropertyCache.getPropertyMap();
   }
 
   /**
@@ -154,10 +192,8 @@ public class BasicClusterDataCache {
   public synchronized void clearCache(HelixConstants.ChangeType changeType) {
     switch (changeType) {
     case LIVE_INSTANCE:
-      _liveInstanceMap.clear();
-      break;
     case INSTANCE_CONFIG:
-      _instanceConfigMap.clear();
+      LOG.warn("clearCache is deprecated for changeType: {}.", changeType);
       break;
     case EXTERNAL_VIEW:
       _externalViewCache.clear();
@@ -182,11 +218,33 @@ public class BasicClusterDataCache {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
-    sb.append("externalViewMap:" + 
_externalViewCache.getExternalViewMap()).append("\n");
-    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+    sb.append("liveInstancePropertyCache: 
").append(_liveInstancePropertyCache.getPropertyMap())
+        .append("\n");
+    sb.append("externalViewCache: 
").append(_externalViewCache.getExternalViewMap()).append("\n");
+    sb.append("instanceConfigPropertyCache: 
").append(_instanceConfigPropertyCache.getPropertyMap())
+        .append("\n");
 
     return sb.toString();
   }
+
+  @Override
+  public String getClusterName() {
+    return _clusterName;
+  }
+
+  @Override
+  public String getClusterEventId() {
+    return _clusterEventId;
+  }
+
+  @Override
+  public void setClusterEventId(String clusterEventId) {
+    _clusterEventId = clusterEventId;
+  }
+
+  @Override
+  public String getPipelineName() {
+    return AbstractDataCache.UNKNOWN_PIPELINE;
+  }
 }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index f94ee7e..2d12f09 100644
--- 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -595,6 +595,17 @@ public class RoutingTableProvider
     @Override
     protected void handleEvent(ClusterEvent event) {
       NotificationContext changeContext = 
event.getAttribute(AttributeName.changeContext.name());
+      HelixConstants.ChangeType changeType = changeContext.getChangeType();
+
+      // Set cluster event id for later processing methods and it also helps 
debug.
+      _dataCache.setClusterEventId(event.getEventId());
+
+      if (changeContext == null || changeContext.getType() != 
NotificationContext.Type.CALLBACK) {
+        _dataCache.requireFullRefresh();
+      } else {
+        _dataCache.notifyDataChange(changeType, 
changeContext.getPathChanged());
+      }
+
       // session has expired clean up the routing table
       if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
         reset();
@@ -683,12 +694,6 @@ public class RoutingTableProvider
     public void queueEvent(NotificationContext context, ClusterEventType 
eventType,
         HelixConstants.ChangeType changeType) {
       ClusterEvent event = new ClusterEvent(_clusterName, eventType);
-      if (context == null || context.getType() != 
NotificationContext.Type.CALLBACK
-          || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
-        _dataCache.requireFullRefresh();
-      } else {
-        _dataCache.notifyDataChange(changeType, context.getPathChanged());
-      }
 
       // Null check for manager in the following line is done in handleEvent()
       event.addAttribute(AttributeName.helixmanager.name(), 
context.getManager());

Reply via email to