This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 71becb3 Fix missed callbacks in CurrentStates based
RoutingTableProvider. (#458)
71becb3 is described below
commit 71becb3de497413ddce2080541f04c1b93028a85
Author: pkuwm <[email protected]>
AuthorDate: Fri Sep 13 17:08:17 2019 -0700
Fix missed callbacks in CurrentStates based RoutingTableProvider. (#458)
1. Update BasicClusterDataCache to do refresh with selective update. Only
when a change happens, we do the cache refresh only for that change type.
2. Improve RoutingTableProvider.queueEvent() and
RoutingTableProvider.handleEvent(). Return instanceConfigs snapshot to callback
immediately, instead of waiting for currentStates completion.
---
.../helix/common/caches/BasicClusterDataCache.java | 116 +++++++++++++++------
.../helix/spectator/RoutingTableProvider.java | 17 +--
2 files changed, 98 insertions(+), 35 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index d3a3d20..b01bb0c 100644
---
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -19,13 +19,12 @@ package org.apache.helix.common.caches;
* under the License.
*/
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
+import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -35,11 +34,16 @@ import org.slf4j.LoggerFactory;
/**
* Cache the basic cluster data, including LiveInstances, InstanceConfigs and
ExternalViews.
*/
-public class BasicClusterDataCache {
+public class BasicClusterDataCache implements ControlContextProvider {
private static Logger LOG =
LoggerFactory.getLogger(BasicClusterDataCache.class.getName());
- protected Map<String, LiveInstance> _liveInstanceMap;
- protected Map<String, InstanceConfig> _instanceConfigMap;
+ private static final String INSTANCE_CONFIG = "InstanceConfig";
+ private static final String LIVE_INSTANCE = "LiveInstance";
+
+ private String _clusterEventId;
+
+ protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
+ protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
protected ExternalViewCache _externalViewCache;
protected String _clusterName;
@@ -48,15 +52,50 @@ public class BasicClusterDataCache {
public BasicClusterDataCache(String clusterName) {
_propertyDataChangedMap = new ConcurrentHashMap<>();
- _liveInstanceMap = new HashMap<>();
- _instanceConfigMap = new HashMap<>();
_externalViewCache = new ExternalViewCache(clusterName);
_clusterName = clusterName;
- requireFullRefresh();
+ _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
+
+ _liveInstancePropertyCache = new PropertyCache<>(this, LIVE_INSTANCE,
+ new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
+ @Override
+ public PropertyKey getRootKey(HelixDataAccessor accessor) {
+ return accessor.keyBuilder().liveInstances();
+ }
+
+ @Override
+ public PropertyKey getObjPropertyKey(HelixDataAccessor accessor,
String objName) {
+ return accessor.keyBuilder().liveInstance(objName);
+ }
+
+ @Override
+ public String getObjName(LiveInstance obj) {
+ return obj.getInstanceName();
+ }
+ }, true);
+
+ _instanceConfigPropertyCache = new PropertyCache<>(this, INSTANCE_CONFIG,
+ new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
+ @Override
+ public PropertyKey getRootKey(HelixDataAccessor accessor) {
+ return accessor.keyBuilder().instanceConfigs();
+ }
+
+ @Override
+ public PropertyKey getObjPropertyKey(HelixDataAccessor accessor,
String objName) {
+ return accessor.keyBuilder().instanceConfig(objName);
+ }
+
+ @Override
+ public String getObjName(InstanceConfig obj) {
+ return obj.getInstanceName();
+ }
+ }, true);
}
/**
- * This refreshes the cluster data by re-fetching the data from zookeeper in
an efficient way
+ * This refreshes the cluster data by re-fetching the data from zookeeper in
an efficient way.
+ * If we want to support multi-threading in the future, this method needs to
be synchronized.
*
* @param accessor
*
@@ -65,7 +104,6 @@ public class BasicClusterDataCache {
public void refresh(HelixDataAccessor accessor) {
LOG.info("START: BasicClusterDataCache.refresh() for cluster " +
_clusterName);
long startTime = System.currentTimeMillis();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW,
Boolean.valueOf(false));
@@ -75,18 +113,18 @@ public class BasicClusterDataCache {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE,
Boolean.valueOf(false));
- _liveInstanceMap =
accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
- LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes
" + (
- System.currentTimeMillis() - start) + " ms");
+ _liveInstancePropertyCache.refresh(accessor);
+ LOG.info("Reload LiveInstances: " +
_liveInstancePropertyCache.getPropertyMap().keySet()
+ + ". Takes " + (System.currentTimeMillis() - start) + " ms");
}
if
(_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
long start = System.currentTimeMillis();
- _propertyDataChangedMap
- .put(HelixConstants.ChangeType.INSTANCE_CONFIG,
Boolean.valueOf(false));
- _instanceConfigMap =
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
- LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ".
Takes " + (
- System.currentTimeMillis() - start) + " ms");
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG,
+ Boolean.valueOf(false));
+ _instanceConfigPropertyCache.refresh(accessor);
+ LOG.info("Reload InstanceConfig: " +
_instanceConfigPropertyCache.getPropertyMap().keySet()
+ + ". Takes " + (System.currentTimeMillis() - start) + " ms");
}
long endTime = System.currentTimeMillis();
@@ -95,9 +133,9 @@ public class BasicClusterDataCache {
- startTime) + " ms");
if (LOG.isDebugEnabled()) {
- LOG.debug("LiveInstances: " + _liveInstanceMap);
- LOG.debug("ExternalViews: " +
_externalViewCache.getExternalViewMap().keySet());
- LOG.debug("InstanceConfigs: " + _instanceConfigMap);
+ LOG.debug("LiveInstances: {}",
_liveInstancePropertyCache.getPropertyMap());
+ LOG.debug("ExternalViews: {}",
_externalViewCache.getExternalViewMap().keySet());
+ LOG.debug("InstanceConfigs: {}",
_instanceConfigPropertyCache.getPropertyMap());
}
}
@@ -116,7 +154,7 @@ public class BasicClusterDataCache {
* @return
*/
public Map<String, LiveInstance> getLiveInstances() {
- return Collections.unmodifiableMap(_liveInstanceMap);
+ return _liveInstancePropertyCache.getPropertyMap();
}
/**
@@ -125,7 +163,7 @@ public class BasicClusterDataCache {
* @return
*/
public Map<String, InstanceConfig> getInstanceConfigMap() {
- return Collections.unmodifiableMap(_instanceConfigMap);
+ return _instanceConfigPropertyCache.getPropertyMap();
}
/**
@@ -154,10 +192,8 @@ public class BasicClusterDataCache {
public synchronized void clearCache(HelixConstants.ChangeType changeType) {
switch (changeType) {
case LIVE_INSTANCE:
- _liveInstanceMap.clear();
- break;
case INSTANCE_CONFIG:
- _instanceConfigMap.clear();
+ LOG.warn("clearCache is deprecated for changeType: {}.", changeType);
break;
case EXTERNAL_VIEW:
_externalViewCache.clear();
@@ -182,11 +218,33 @@ public class BasicClusterDataCache {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
- sb.append("externalViewMap:" +
_externalViewCache.getExternalViewMap()).append("\n");
- sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+ sb.append("liveInstancePropertyCache:
").append(_liveInstancePropertyCache.getPropertyMap())
+ .append("\n");
+ sb.append("externalViewCache:
").append(_externalViewCache.getExternalViewMap()).append("\n");
+ sb.append("instanceConfigPropertyCache:
").append(_instanceConfigPropertyCache.getPropertyMap())
+ .append("\n");
return sb.toString();
}
+
+ @Override
+ public String getClusterName() {
+ return _clusterName;
+ }
+
+ @Override
+ public String getClusterEventId() {
+ return _clusterEventId;
+ }
+
+ @Override
+ public void setClusterEventId(String clusterEventId) {
+ _clusterEventId = clusterEventId;
+ }
+
+ @Override
+ public String getPipelineName() {
+ return AbstractDataCache.UNKNOWN_PIPELINE;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index f94ee7e..2d12f09 100644
---
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -595,6 +595,17 @@ public class RoutingTableProvider
@Override
protected void handleEvent(ClusterEvent event) {
NotificationContext changeContext =
event.getAttribute(AttributeName.changeContext.name());
+ HelixConstants.ChangeType changeType = changeContext.getChangeType();
+
+ // Set cluster event id for later processing methods and it also helps
debug.
+ _dataCache.setClusterEventId(event.getEventId());
+
+ if (changeContext == null || changeContext.getType() !=
NotificationContext.Type.CALLBACK) {
+ _dataCache.requireFullRefresh();
+ } else {
+ _dataCache.notifyDataChange(changeType,
changeContext.getPathChanged());
+ }
+
// session has expired clean up the routing table
if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
reset();
@@ -683,12 +694,6 @@ public class RoutingTableProvider
public void queueEvent(NotificationContext context, ClusterEventType
eventType,
HelixConstants.ChangeType changeType) {
ClusterEvent event = new ClusterEvent(_clusterName, eventType);
- if (context == null || context.getType() !=
NotificationContext.Type.CALLBACK
- || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
- _dataCache.requireFullRefresh();
- } else {
- _dataCache.notifyDataChange(changeType, context.getPathChanged());
- }
// Null check for manager in the following line is done in handleEvent()
event.addAttribute(AttributeName.helixmanager.name(),
context.getManager());