Optimize ClusterDataCache's data refresh strategy by: 1) cacheing CurrentStates locally and update only these that have been changed from zk. 2) Controller listens on ResourceConfig changes 3) Cache resource configs locally and update them all if there is any changes to resource configs.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d6f4943e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d6f4943e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d6f4943e Branch: refs/heads/master Commit: d6f4943ed87fc46de7852dad1474b1b66a73e0ab Parents: 28d1a2f Author: Lei Xia <l...@linkedin.com> Authored: Mon Jul 31 10:58:33 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:07:43 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/BaseDataAccessor.java | 8 +- .../org/apache/helix/HelixDataAccessor.java | 17 +- .../java/org/apache/helix/HelixProperty.java | 103 ++++++- .../main/java/org/apache/helix/PropertyKey.java | 30 +- .../controller/GenericHelixController.java | 132 ++++----- .../helix/controller/HelixControllerMain.java | 3 +- .../helix/controller/pipeline/Pipeline.java | 5 +- .../controller/rebalancer/AutoRebalancer.java | 8 +- .../stages/BestPossibleStateCalcStage.java | 2 +- .../controller/stages/ClusterDataCache.java | 295 ++++++++++++------- .../controller/stages/ClusterEventType.java | 2 + .../stages/MessageGenerationPhase.java | 6 + .../helix/manager/zk/CallbackHandler.java | 16 +- .../manager/zk/ControllerManagerHelper.java | 6 +- .../helix/manager/zk/ZKHelixDataAccessor.java | 63 +++- .../org/apache/helix/model/ClusterConfig.java | 29 ++ .../apache/helix/model/ParticipantHistory.java | 24 +- .../helix/task/TaskStateModelFactory.java | 1 - .../java/org/apache/helix/MockAccessor.java | 48 ++- .../controller/stages/TestClusterDataCache.java | 3 + .../integration/TestZkCallbackHandlerLeak.java | 18 +- .../helix/integration/TestZkConnectionLost.java | 6 +- .../TestClusterDataCacheSelectiveUpdate.java | 179 +++++++++++ .../manager/TestConsecutiveZkSessionExpiry.java | 2 +- .../manager/TestZkCallbackHandlerLeak.java | 16 +- .../paticipant/TestInstanceHistory.java | 6 +- .../rebalancer/TestDelayedAutoRebalance.java | 3 +- .../task/TestTaskRebalancerStopResume.java | 6 +- .../apache/helix/mock/MockBaseDataAccessor.java | 119 +++++--- .../TestClusterStatusMonitorLifecycle.java | 4 +- 30 files changed, 880 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java index 32510c0..a8f2907 100644 --- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java @@ -88,7 +88,7 @@ public interface BaseDataAccessor<T> { * Use it when creating children under a parent node. This will use async api for better * performance. If the child already exists it will return false. * @param paths the paths to the children ZNodes - * @param record List of data to write to each of the path + * @param records List of data to write to each of the path * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists) */ @@ -98,7 +98,7 @@ public interface BaseDataAccessor<T> { * can set multiple children under a parent node. This will use async api for better * performance. If this child does not exist it will create it. * @param paths the paths to the children ZNodes - * @param record List of data with which to overwrite the corresponding ZNodes + * @param records List of data with which to overwrite the corresponding ZNodes * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return For each child: true if the data was set, false otherwise */ @@ -107,7 +107,7 @@ public interface BaseDataAccessor<T> { /** * Can update multiple nodes using async api for better performance. If a child does not * exist it will create it. - * @param the paths to the children ZNodes + * @param paths paths to the children ZNodes * @param updaters List of update routines for records to update * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return For each child, true if the data is updated successfully, false otherwise @@ -142,7 +142,7 @@ public interface BaseDataAccessor<T> { /** * Get the children under a parent path using async api - * @param path path to the immediate parent ZNode + * @param parentPath path to the immediate parent ZNode * @param stats Zookeeper Stat objects corresponding to each child * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return A list of children of the parent ZNode http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java index e7777c6..5c2baec 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java @@ -28,7 +28,6 @@ import org.apache.helix.model.Message; import org.apache.helix.model.PauseSignal; import org.apache.helix.model.StateModelDefinition; - /** * Interface used to interact with Helix Data Types like IdealState, Config, * LiveInstance, Message, ExternalView etc PropertyKey represent the HelixData @@ -82,7 +81,7 @@ public interface HelixDataAccessor { * @param keys * @return */ - public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys); + <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys); /** * Removes the property @@ -93,6 +92,20 @@ public interface HelixDataAccessor { boolean removeProperty(PropertyKey key); /** + * Return the metadata (HelixProperty.Stat) of the given property + * @param key + * @return + */ + HelixProperty.Stat getPropertyStat(PropertyKey key); + + /** + * Return a list of property stats, each of which must refer to a single Helix property. + * @param keys + * @return + */ + List<HelixProperty.Stat> getPropertyStats(List<PropertyKey> keys); + + /** * Return the child names for a property. PropertyKey needs to refer to a * collection like instances, resources. PropertyKey.isLeaf must be false * @param key http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/HelixProperty.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java index 73055de..4a3ec5e 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java +++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java @@ -44,11 +44,93 @@ public class HelixProperty { protected final ZNRecord _record; /** + * Metadata of a HelixProperty + */ + public static class Stat { + // the version field of zookeeper Stat + private int _version; + private long _creationTime; + private long _modifiedTime; + + public Stat(int version, long creationTime, long modifiedTime) { + _version = version; + _creationTime = creationTime; + _modifiedTime = modifiedTime; + } + + public Stat(Stat stat) { + _version = stat.getVersion(); + _creationTime = stat.getCreationTime(); + _modifiedTime = stat.getModifiedTime(); + } + + public Stat() { + _version = -1; + _creationTime = -1; + _modifiedTime = -1; + } + + public int getVersion() { + return _version; + } + + public void setVersion(int version) { + _version = version; + } + + public long getCreationTime() { + return _creationTime; + } + + public void setCreationTime(long creationTime) { + _creationTime = creationTime; + } + + public long getModifiedTime() { + return _modifiedTime; + } + + public void setModifiedTime(long modifiedTime) { + _modifiedTime = modifiedTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Stat)) { + return false; + } + + Stat stat = (Stat) o; + + if (_version != stat._version) { + return false; + } + if (_creationTime != stat._creationTime) { + return false; + } + return _modifiedTime == stat._modifiedTime; + } + + @Override + public int hashCode() { + int result = _version; + result = 31 * result + (int) (_creationTime ^ (_creationTime >>> 32)); + result = 31 * result + (int) (_modifiedTime ^ (_modifiedTime >>> 32)); + return result; + } + } + + private Stat _stat; + + /** * Initialize the property with an identifier * @param id */ public HelixProperty(String id) { - _record = new ZNRecord(id); + this(new ZNRecord(id), id); } /** @@ -56,7 +138,7 @@ public class HelixProperty { * @param record */ public HelixProperty(ZNRecord record) { - _record = new ZNRecord(record); + this(record, record.getId()); } /** @@ -66,6 +148,7 @@ public class HelixProperty { */ public HelixProperty(ZNRecord record, String id) { _record = new ZNRecord(record, id); + _stat = new Stat(_record.getVersion(), _record.getCreationTime(), _record.getModifiedTime()); } /** @@ -229,6 +312,22 @@ public class HelixProperty { } /** + * Get the metadata (stat) of this record + * @return HelixProperty.Stat + */ + public Stat getStat() { + return _stat; + } + + /** + * Set the metadata (stat) for this record + * @param stat + */ + public void setStat(Stat stat) { + _stat = new Stat(stat); + } + + /** * Get property validity * @return true if valid, false if invalid */ http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/PropertyKey.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java index 5b4218c..7f1d5d9 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java @@ -86,7 +86,11 @@ public class PropertyKey { @Override public int hashCode() { - return super.hashCode(); + int result = _type.hashCode(); + result = 31 * result + Arrays.hashCode(_params); + result = 31 * result + _typeClazz.hashCode(); + result = 31 * result + (_configScope != null ? _configScope.hashCode() : 0); + return result; } @Override @@ -94,6 +98,30 @@ public class PropertyKey { return getPath(); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof PropertyKey)) { + return false; + } + + PropertyKey key = (PropertyKey) o; + + if (_type != key._type) { + return false; + } + // Probably incorrect - comparing Object[] arrays with Arrays.equals + if (!Arrays.equals(_params, key._params)) { + return false; + } + if (!_typeClazz.equals(key._typeClazz)) { + return false; + } + return _configScope == key._configScope; + } + /** * Get the path associated with this property * @return absolute path to the property http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 33db34d..a6248f6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -37,7 +37,6 @@ import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; -import org.apache.helix.api.listeners.ConfigChangeListener; import org.apache.helix.api.listeners.ControllerChangeListener; import org.apache.helix.api.listeners.CurrentStateChangeListener; import org.apache.helix.api.listeners.IdealStateChangeListener; @@ -45,6 +44,7 @@ import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.PreFetch; +import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.AttributeName; @@ -72,6 +72,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.PauseSignal; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.ClusterEventMonitor; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.TaskDriver; @@ -92,9 +93,9 @@ import static org.apache.helix.HelixConstants.ChangeType; * 4. select the messages that can be sent, needs messages and state model constraints <br> * 5. send messages */ -public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener, +public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, - ControllerChangeListener, InstanceConfigChangeListener { + ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener { private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName()); private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000; private static final int ASYNC_TASKS_THREADPOOL_SIZE = 40; @@ -257,10 +258,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC registry.register(ClusterEventType.IdealStateChange, dataRefresh, rebalancePipeline); registry.register(ClusterEventType.CurrentStateChange, dataRefresh, rebalancePipeline, externalViewPipeline); - registry.register(ClusterEventType.ConfigChange, dataRefresh, rebalancePipeline); + registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, rebalancePipeline); + registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, rebalancePipeline); registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, rebalancePipeline, externalViewPipeline); - registry.register(ClusterEventType.MessageChange, dataRefresh, rebalancePipeline); registry.register(ClusterEventType.ExternalViewChange, dataRefresh); registry.register(ClusterEventType.Resume, dataRefresh, rebalancePipeline, externalViewPipeline); @@ -363,8 +364,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC return; } - logger.info(String.format("START: Invoking %s controller pipeline for event: %s", - getPipelineType(cache.isTaskCache()), event.getEventType())); + logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s", + manager.getClusterName(), getPipelineType(cache.isTaskCache()), event.getEventType())); long startTime = System.currentTimeMillis(); for (Pipeline pipeline : pipelines) { try { @@ -424,20 +425,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); - if (changeContext == null || changeContext.getType() != Type.CALLBACK) { - _cache.requireFullRefresh(); - _taskCache.requireFullRefresh(); - } else { - _cache.updateDataChange(ChangeType.CURRENT_STATE); - _taskCache.updateDataChange(ChangeType.CURRENT_STATE); - } - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.CurrentStateChange); - event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); - event.addAttribute(AttributeName.instanceName.name(), instanceName); - event.addAttribute(AttributeName.changeContext.name(), changeContext); - - _eventQueue.put(event); - _taskEventQueue.put(event.clone()); + notifyCaches(changeContext, ChangeType.CURRENT_STATE); + pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections + .<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName)); logger.info("END: GenericClusterController.onStateChange()"); } @@ -446,22 +436,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) { logger.info("START: GenericClusterController.onMessage() for cluster " + _clusterName); - if (changeContext == null || changeContext.getType() != Type.CALLBACK) { - _cache.requireFullRefresh(); - _taskCache.requireFullRefresh(); - } else { - _cache.updateDataChange(ChangeType.MESSAGE); - _taskCache.updateDataChange(ChangeType.MESSAGE); - } - - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.MessageChange); - event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); - event.addAttribute(AttributeName.instanceName.name(), instanceName); - event.addAttribute(AttributeName.changeContext.name(), changeContext); - - _eventQueue.put(event); - _taskEventQueue.put(event.clone()); - + notifyCaches(changeContext, ChangeType.MESSAGE); + pushToEventQueues(ClusterEventType.MessageChange, changeContext, + Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName)); if (_clusterStatusMonitor != null && messages != null) { _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size()); } @@ -497,12 +474,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC checkLiveInstancesObservation(liveInstances, changeContext); } - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.LiveInstanceChange); - event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); - event.addAttribute(AttributeName.changeContext.name(), changeContext); - event.addAttribute(AttributeName.eventData.name(), liveInstances); - _eventQueue.put(event); - _taskEventQueue.put(event.clone()); + pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext, + Collections.<String, Object>singletonMap(AttributeName.eventData.name(), liveInstances)); + logger.info( "END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName); } @@ -541,20 +515,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) { logger.info( "START: Generic GenericClusterController.onIdealStateChange() for cluster " + _clusterName); - if (changeContext == null || changeContext.getType() != Type.CALLBACK) { - _cache.requireFullRefresh(); - _taskCache.requireFullRefresh(); - } else { - _cache.updateDataChange(ChangeType.IDEAL_STATE); - _taskCache.updateDataChange(ChangeType.IDEAL_STATE); - } - - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.IdealStateChange); - event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); - event.addAttribute(AttributeName.changeContext.name(), changeContext); - - _eventQueue.put(event); - _taskEventQueue.put(event.clone()); + notifyCaches(changeContext, ChangeType.IDEAL_STATE); + pushToEventQueues(ClusterEventType.IdealStateChange, changeContext, + Collections.<String, Object>emptyMap()); if (changeContext.getType() != Type.FINALIZE) { checkRebalancingTimer(changeContext.getManager(), idealStates, _cache.getClusterConfig()); @@ -565,33 +528,50 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC @Override @PreFetch(enabled = false) - public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) { - logger.info("START: GenericClusterController.onConfigChange() for cluster " + _clusterName); - if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, + NotificationContext changeContext) { + logger.info( + "START: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName); + notifyCaches(changeContext, ChangeType.INSTANCE_CONFIG); + pushToEventQueues(ClusterEventType.InstanceConfigChange, changeContext, + Collections.<String, Object>emptyMap()); + logger.info( + "END: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName); + } + + @Override + @PreFetch(enabled = false) + public void onResourceConfigChange( + List<ResourceConfig> resourceConfigs, NotificationContext context) { + logger.info( + "START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); + notifyCaches(context, ChangeType.RESOURCE_CONFIG); + pushToEventQueues(ClusterEventType.ResourceConfigChange, context, + Collections.<String, Object>emptyMap()); + logger + .info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName); + } + + private void notifyCaches(NotificationContext context, ChangeType changeType) { + if (context == null || context.getType() != Type.CALLBACK) { _cache.requireFullRefresh(); _taskCache.requireFullRefresh(); } else { - _cache.updateDataChange(ChangeType.INSTANCE_CONFIG); - _taskCache.updateDataChange(ChangeType.INSTANCE_CONFIG); + _cache.notifyDataChange(changeType, context.getPathChanged()); + _taskCache.notifyDataChange(changeType, context.getPathChanged()); } + } - ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.ConfigChange); - event.addAttribute(AttributeName.changeContext.name(), changeContext); + private void pushToEventQueues(ClusterEventType eventType, NotificationContext changeContext, + Map<String, Object> eventAttributes) { + ClusterEvent event = new ClusterEvent(_clusterName, eventType); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); + event.addAttribute(AttributeName.changeContext.name(), changeContext); + for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) { + event.addAttribute(attr.getKey(), attr.getValue()); + } _eventQueue.put(event); _taskEventQueue.put(event.clone()); - - logger.info("END: GenericClusterController.onConfigChange() for cluster " + _clusterName); - } - - @Override - @PreFetch(enabled = false) - public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, - NotificationContext changeContext) { - logger.info( - "START: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName); - onConfigChange(instanceConfigs, changeContext); - logger.info("END: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java index 5f222f9..436d417 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java +++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java @@ -132,7 +132,8 @@ public class HelixControllerMain { public static void addListenersToController(HelixManager manager, GenericHelixController controller) { try { - manager.addConfigChangeListener(controller); + manager.addInstanceConfigChangeListener(controller); + manager.addResourceConfigChangeListener(controller); manager.addLiveInstanceChangeListener(controller); manager.addIdealStateChangeListener(controller); manager.addControllerListener(controller); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java index ebe42b9..2ef3278 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java @@ -53,8 +53,9 @@ public class Pipeline { } for (Stage stage : _stages) { long startTime = System.currentTimeMillis(); - logger.info( - String.format("START %s pipeline for stage %s", _pipelineType, stage.getStageName())); + logger.info(String + .format("START %s for %s pipeline for cluster %s", stage.getStageName(), _pipelineType, + event.getClusterName())); stage.preProcess(); stage.process(event); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index a478973..7417b36 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -51,12 +52,17 @@ import org.apache.log4j.Logger; public class AutoRebalancer extends AbstractRebalancer { private static final Logger LOG = Logger.getLogger(AutoRebalancer.class); - @Override public IdealState computeNewIdealState(String resourceName, + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet()); String stateModelName = currentIdealState.getStateModelDefRef(); StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName); + if (stateModelDef == null) { + LOG.error("State Model Definition null for resource: " + resourceName); + throw new HelixException("State Model Definition null for resource: " + resourceName); + } Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances(); String replicas = currentIdealState.getReplicas(); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 3c30a4d..9f46d67 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -223,7 +223,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { } } } - + private Rebalancer getRebalancer(IdealState idealState, String resourceName) { Rebalancer customizedRebalancer = null; http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 0d62cad..cce6fa4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyType; @@ -59,6 +60,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.zookeeper.data.Stat; import static org.apache.helix.HelixConstants.ChangeType; @@ -67,8 +69,7 @@ import static org.apache.helix.HelixConstants.ChangeType; * provides useful methods to search/lookup properties */ public class ClusterDataCache { - - private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; + private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName()); private static final String NAME = "NAME"; private Map<ChangeType, Boolean> _propertyChangedMap; @@ -93,9 +94,11 @@ public class ClusterDataCache { private Map<String, ZNRecord> _contextMap = new HashMap<>(); // maintain a cache of participant messages across pipeline runs - Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); + private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); + private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap(); + - Map<String, Integer> _participantActiveTaskCount = new HashMap<>(); + private Map<String, Integer> _participantActiveTaskCount = new HashMap<>(); private ExecutorService _asyncTasksThreadPool; @@ -104,9 +107,6 @@ public class ClusterDataCache { private String _clusterName; - private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName()); - - public ClusterDataCache () { _propertyChangedMap = new ConcurrentHashMap<>(); for(ChangeType type : ChangeType.values()) { @@ -128,52 +128,97 @@ public class ClusterDataCache { public synchronized boolean refresh(HelixDataAccessor accessor) { LOG.info("START: ClusterDataCache.refresh()"); long startTime = System.currentTimeMillis(); - Builder keyBuilder = accessor.keyBuilder(); if (_propertyChangedMap.get(ChangeType.IDEAL_STATE)) { - _propertyChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false)); + long start = System.currentTimeMillis(); _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates()); + _propertyChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false)); + if (LOG.isDebugEnabled()) { + LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + ( + System.currentTimeMillis() - start) + " ms"); + } } if (_propertyChangedMap.get(ChangeType.LIVE_INSTANCE)) { - _propertyChangedMap.put(ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); + _propertyChangedMap.put(ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); _updateInstanceOfflineTime = true; + LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet()); } if (_propertyChangedMap.get(ChangeType.INSTANCE_CONFIG)) { - _propertyChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + _propertyChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); + LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet()); + } + + if (_propertyChangedMap.get(ChangeType.RESOURCE_CONFIG)) { + _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs()); + _propertyChangedMap.put(ChangeType.RESOURCE_CONFIG, Boolean.valueOf(false)); + LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size()); } _idealStateMap = Maps.newHashMap(_idealStateCacheMap); _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap); _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap); + _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap); if (_updateInstanceOfflineTime) { updateOfflineInstanceHistory(accessor); } - // TODO: Need an optimize for reading context only if the refresh is needed. - refreshContexts(accessor); - - // TODO: We should listen on resource config change instead of fetching every time - // And add back resourceConfigCacheMap - refreshResourceConfigs(accessor); + if (_isTaskCache) { + refreshJobContexts(accessor); + updateWorkflowJobConfigs(); + } Map<String, StateModelDefinition> stateDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs()); _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap); _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints()); + refreshMessages(accessor); + refreshCurrentStates(accessor); + + _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); + if (_clusterConfig != null) { + _idealStateRuleMap = _clusterConfig.getIdealStateRules(); + } else { + _idealStateRuleMap = Maps.newHashMap(); + LOG.error("Cluster config is null!"); + } + + long endTime = System.currentTimeMillis(); + LOG.info( + "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took " + (endTime + - startTime) + " ms"); + + if (LOG.isDebugEnabled()) { + LOG.debug("# of StateModelDefinition read from zk: " + _stateModelDefMap.size()); + LOG.debug("# of ConstraintMap read from zk: " + _constraintMap.size()); + LOG.debug("LiveInstances: " + _liveInstanceMap.keySet()); + for (LiveInstance instance : _liveInstanceMap.values()) { + LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); + } + LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet()); + LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); + LOG.debug("ClusterConfigs: " + _clusterConfig); + LOG.debug("JobContexts: " + _contextMap.keySet()); + } + if (LOG.isTraceEnabled()) { - for (LiveInstance instance : _liveInstanceMap.values()) { - LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); - } + LOG.trace("Cache content: " + toString()); } - Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>(); + return true; + } + + private void refreshMessages(HelixDataAccessor accessor) { + long start = System.currentTimeMillis(); + Builder keyBuilder = accessor.keyBuilder(); + + Map<String, Map<String, Message>> msgMap = new HashMap<>(); List<PropertyKey> newMessageKeys = Lists.newLinkedList(); long purgeSum = 0; for (String instanceName : _liveInstanceMap.keySet()) { @@ -220,11 +265,52 @@ public class ClusterDataCache { } } _messageMap = Collections.unmodifiableMap(msgMap); - LOG.debug("Purge took: " + purgeSum); + + if (LOG.isDebugEnabled()) { + LOG.debug("Message purge took: " + purgeSum); + LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + ( + System.currentTimeMillis() - start) + " ms."); + } + } + + private void refreshCurrentStates(HelixDataAccessor accessor) { + refreshCurrentStatesCache(accessor); + + Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>(); + for (PropertyKey key : _currentStateCache.keySet()) { + CurrentState currentState = _currentStateCache.get(key); + String[] params = key.getParams(); + if (currentState != null && params.length >= 4) { + String instanceName = params[1]; + String sessionId = params[2]; + String stateName = params[3]; + Map<String, Map<String, CurrentState>> instanceCurStateMap = + allCurStateMap.get(instanceName); + if (instanceCurStateMap == null) { + instanceCurStateMap = Maps.newHashMap(); + allCurStateMap.put(instanceName, instanceCurStateMap); + } + Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId); + if (sessionCurStateMap == null) { + sessionCurStateMap = Maps.newHashMap(); + instanceCurStateMap.put(sessionId, sessionCurStateMap); + } + sessionCurStateMap.put(stateName, currentState); + } + } + + for (String instance : allCurStateMap.keySet()) { + allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance))); + } + _currentStateMap = Collections.unmodifiableMap(allCurStateMap); + } + + // reload current states that has been changed from zk to local cache. + private void refreshCurrentStatesCache(HelixDataAccessor accessor) { + long start = System.currentTimeMillis(); + Builder keyBuilder = accessor.keyBuilder(); List<PropertyKey> currentStateKeys = Lists.newLinkedList(); - Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = - new HashMap<>(); for (String instanceName : _liveInstanceMap.keySet()) { LiveInstance liveInstance = _liveInstanceMap.get(instanceName); String sessionId = liveInstance.getSessionId(); @@ -233,91 +319,54 @@ public class ClusterDataCache { for (String currentStateName : currentStateNames) { currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName)); } + } - // ensure an empty current state map for all live instances and sessions - Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName); - if (instanceCurStateMap == null) { - instanceCurStateMap = Maps.newHashMap(); - allCurStateMap.put(instanceName, instanceCurStateMap); - } - Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId); - if (sessionCurStateMap == null) { - sessionCurStateMap = Maps.newHashMap(); - instanceCurStateMap.put(sessionId, sessionCurStateMap); + // All new entries from zk not cached locally yet should be read from ZK. + List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys); + reloadKeys.removeAll(_currentStateCache.keySet()); + + List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet()); + cachedKeys.retainAll(currentStateKeys); + + List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys); + Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap(); + for (int i=0; i < cachedKeys.size(); i++) { + PropertyKey key = cachedKeys.get(i); + HelixProperty.Stat stat = stats.get(i); + if (stat != null) { + CurrentState property = _currentStateCache.get(key); + if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { + currentStatesMap.put(key, property); + } else { + // need update from zk + reloadKeys.add(key); + } + } else { + LOG.debug("stat is null for key: " + key); + reloadKeys.add(key); } } - List<CurrentState> currentStates = accessor.getProperty(currentStateKeys); - Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator(); + + List<CurrentState> currentStates = accessor.getProperty(reloadKeys); + Iterator<PropertyKey> csKeyIter = reloadKeys.iterator(); for (CurrentState currentState : currentStates) { PropertyKey key = csKeyIter.next(); - String[] params = key.getParams(); - if (currentState != null && params.length >= 4) { - Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]); - Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]); - sessionCurStateMap.put(params[3], currentState); - } - } - - for (String instance : allCurStateMap.keySet()) { - allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance))); - } - _currentStateMap = Collections.unmodifiableMap(allCurStateMap); - - _idealStateRuleMap = Maps.newHashMap(); - _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - if (_clusterConfig != null) { - for (String simpleKey : _clusterConfig.getRecord().getSimpleFields().keySet()) { - if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { - String simpleValue = _clusterConfig.getRecord().getSimpleField(simpleKey); - String[] rules = simpleValue.split("(?<!\\\\),"); - Map<String, String> singleRule = Maps.newHashMap(); - for (String rule : rules) { - String[] keyValue = rule.split("(?<!\\\\)="); - if (keyValue.length >= 2) { - singleRule.put(keyValue[0], keyValue[1]); - } - } - _idealStateRuleMap.put(simpleKey, singleRule); - } + if (currentState != null) { + currentStatesMap.put(key, currentState); + } else { + LOG.debug("CurrentState null for key: " + key); } - } else { - LOG.error("Cluster config is null!"); } - long endTime = System.currentTimeMillis(); - LOG.info( - "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took " + (endTime - - startTime) + " ms"); + _currentStateCache = Collections.unmodifiableMap(currentStatesMap); if (LOG.isDebugEnabled()) { - int numPaths = _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size() - + _instanceConfigMap.size() + _resourceConfigMap.size() + _constraintMap.size() - + newMessageKeys.size() + currentStateKeys.size(); - LOG.debug("Paths read: " + numPaths); + LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size()); + LOG.debug( + "# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size() - reloadKeys.size())); } - - return true; - } - - public ClusterConfig getClusterConfig() { - return _clusterConfig; - } - - public void setClusterConfig(ClusterConfig clusterConfig) { - _clusterConfig = clusterConfig; - } - - public String getClusterName() { - return _clusterConfig != null ? _clusterConfig.getClusterName() : _clusterName; - } - - /** - * Return the last offline time map for all offline instances. - * - * @return - */ - public Map<String, Long> getInstanceOfflineTimeMap() { - return _instanceOfflineTimeMap; + LOG.info( + "Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!"); } private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { @@ -344,6 +393,27 @@ public class ClusterDataCache { _updateInstanceOfflineTime = false; } + public ClusterConfig getClusterConfig() { + return _clusterConfig; + } + + public void setClusterConfig(ClusterConfig clusterConfig) { + _clusterConfig = clusterConfig; + } + + public String getClusterName() { + return _clusterConfig != null ? _clusterConfig.getClusterName() : _clusterName; + } + + /** + * Return the last offline time map for all offline instances. + * + * @return + */ + public Map<String, Long> getInstanceOfflineTimeMap() { + return _instanceOfflineTimeMap; + } + /** * Retrieves the idealstates for all resources * @return @@ -377,7 +447,7 @@ public class ClusterDataCache { * Return the set of all instances names. */ public Set<String> getAllInstances() { - return new HashSet<String>(_instanceConfigMap.keySet()); + return new HashSet<>(_instanceConfigMap.keySet()); } /** @@ -479,7 +549,7 @@ public class ClusterDataCache { public void cacheMessages(List<Message> messages) { for (Message message : messages) { String instanceName = message.getTgtName(); - Map<String, Message> instMsgMap = null; + Map<String, Message> instMsgMap; if (_messageCache.containsKey(instanceName)) { instMsgMap = _messageCache.get(instanceName); } else { @@ -545,14 +615,20 @@ public class ClusterDataCache { } /** - * Returns the resource config * Notify the cache that some part of the cluster data has been changed. */ - public void updateDataChange(ChangeType changeType) { + public void notifyDataChange(ChangeType changeType) { _propertyChangedMap.put(changeType, Boolean.valueOf(true)); } /** + * Notify the cache that some part of the cluster data has been changed. + */ + public void notifyDataChange(ChangeType changeType, String pathChanged) { + notifyDataChange(changeType); + } + + /** * Returns the instance config map * * @return @@ -826,12 +902,17 @@ public class ClusterDataCache { sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n"); sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n"); + sb.append("jobContextMap:" + _contextMap).append("\n"); sb.append("messageMap:" + _messageMap).append("\n"); + sb.append("currentStateMap:" + _currentStateMap).append("\n"); + sb.append("clusterConfig:" + _clusterConfig).append("\n"); return sb.toString(); } - private void refreshContexts(HelixDataAccessor accessor) { + private void refreshJobContexts(HelixDataAccessor accessor) { + // TODO: Need an optimize for reading context only if the refresh is needed. + long start = System.currentTimeMillis(); _contextMap.clear(); if (_clusterName == null) { return; @@ -858,12 +939,16 @@ public class ClusterDataCache { String.format("Context for %s is null or miss the context NAME!", childNames.get((i)))); } } + + if (LOG.isDebugEnabled()) { + LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + ( + System.currentTimeMillis() - start) + " ms"); + } } - private void refreshResourceConfigs(HelixDataAccessor accessor) { + private void updateWorkflowJobConfigs() { _workflowConfigMap.clear(); _jobConfigMap.clear(); - _resourceConfigMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs()); for (Map.Entry<String, ResourceConfig> entry : _resourceConfigMap.entrySet()) { if (entry.getValue().getRecord().getSimpleFields() .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) { http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java index e9e5a58..027c7c7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java @@ -23,6 +23,8 @@ public enum ClusterEventType { IdealStateChange, CurrentStateChange, ConfigChange, + ResourceConfigChange, + InstanceConfigChange, LiveInstanceChange, MessageChange, ExternalViewChange, http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 05b8a66..6555f34 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.api.config.StateTransitionTimeoutConfig; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -76,6 +77,11 @@ public class MessageGenerationPhase extends AbstractBaseStage { Resource resource = resourceMap.get(resourceName); StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef()); + if (stateModelDef == null) { + logger.error( + "State Model Definition null, skip generating messages for resource: " + resourceName); + continue; + } for (Partition partition : resource.getPartitions()) { http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 512fa78..c63680f 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -68,6 +68,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor; import org.apache.log4j.Logger; import org.apache.zookeeper.Watcher.Event.EventType; +import sun.rmi.runtime.Log; import static org.apache.helix.HelixConstants.ChangeType.*; @@ -519,11 +520,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { @Override public void handleDataChange(String dataPath, Object data) { + logger.debug("Data change callback: paths changed: " + dataPath); + try { updateNotificationTime(System.nanoTime()); if (dataPath != null && dataPath.startsWith(_path)) { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); + changeContext.setPathChanged(dataPath); enqueueTask(changeContext); } } catch (Exception e) { @@ -533,6 +537,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } @Override public void handleDataDeleted(String dataPath) { + logger.debug("Data change callback: path deleted: " + dataPath); + if (_changeType == IDEAL_STATE || _changeType == LIVE_INSTANCE) { + logger.info("Data deleted callback, deleted path: " + dataPath); + } + try { updateNotificationTime(System.nanoTime()); if (dataPath != null && dataPath.startsWith(_path)) { @@ -546,9 +555,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { + ", listener: " + _listener); _zkClient.unsubscribeChildChanges(dataPath, this); // No need to invoke() since this event will handled by child-change on parent-node - // NotificationContext changeContext = new NotificationContext(_manager); - // changeContext.setType(NotificationContext.Type.CALLBACK); - // invoke(changeContext); } } catch (Exception e) { String msg = "exception in handling data-delete-change. path: " + dataPath + ", listener: " @@ -559,6 +565,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { @Override public void handleChildChange(String parentPath, List<String> currentChilds) { + logger.debug("Data change callback: child changed, path: " + parentPath + ", current childs: " + + currentChilds); + try { updateNotificationTime(System.nanoTime()); if (parentPath != null && parentPath.startsWith(_path)) { @@ -569,6 +578,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } else { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); + changeContext.setPathChanged(parentPath); enqueueTask(changeContext); } } http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java index 6c43193..726e3bc 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java @@ -77,11 +77,10 @@ public class ControllerManagerHelper { /** * setup generic-controller */ - _manager.addConfigChangeListener(controller); + _manager.addInstanceConfigChangeListener(controller); + _manager.addResourceConfigChangeListener(controller); _manager.addLiveInstanceChangeListener(controller); _manager.addIdealStateChangeListener(controller); - // no need for controller to listen on external-view - // _manager.addExternalViewChangeListener(controller); _manager.addControllerListener(controller); } catch (ZkInterruptedException e) { LOG.warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). " @@ -97,6 +96,7 @@ public class ControllerManagerHelper { * reset generic-controller */ _manager.removeListener(keyBuilder.instanceConfigs(), controller); + _manager.removeListener(keyBuilder.resourceConfigs(), controller); _manager.removeListener(keyBuilder.liveInstances(), controller); _manager.removeListener(keyBuilder.idealStates(), controller); _manager.removeListener(keyBuilder.controller(), controller); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index d9818ca..0388399 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -57,7 +57,6 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { private final String _clusterName; private final Builder _propertyKeyBuilder; private final GroupCommit _groupCommit = new GroupCommit(); - String _zkPropertyTransferSvcUrl = null; public ZKHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) { this(clusterName, null, baseDataAccessor); @@ -71,7 +70,8 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { _propertyKeyBuilder = new PropertyKey.Builder(_clusterName); } - @Override public boolean createStateModelDef(StateModelDefinition stateModelDef) { + @Override + public boolean createStateModelDef(StateModelDefinition stateModelDef) { String path = PropertyPathBuilder.stateModelDef(_clusterName, stateModelDef.getId()); HelixProperty property = getProperty(new PropertyKey.Builder(_clusterName).stateModelDef(stateModelDef.getId())); @@ -94,7 +94,8 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { @Override public boolean createControllerMessage(Message message) { - return _baseDataAccessor.create(PropertyPathBuilder.controllerMessage(_clusterName, message.getMsgId()), + return _baseDataAccessor.create(PropertyPathBuilder.controllerMessage(_clusterName, + message.getMsgId()), message.getRecord(), AccessOption.PERSISTENT); } @@ -192,21 +193,28 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { List<T> childValues = new ArrayList<T>(); // read all records - List<String> paths = new ArrayList<String>(); + List<String> paths = new ArrayList<>(); + List<Stat> stats = new ArrayList<>(); for (PropertyKey key : keys) { paths.add(key.getPath()); + stats.add(new Stat()); } - List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0); + List<ZNRecord> children = _baseDataAccessor.get(paths, stats, 0); // check if bucketized for (int i = 0; i < keys.size(); i++) { PropertyKey key = keys.get(i); ZNRecord record = children.get(i); + Stat stat = stats.get(i); PropertyType type = key.getType(); String path = key.getPath(); int options = constructOptions(type); - // ZNRecord record = null; + if (record != null) { + record.setCreationTime(stat.getCtime()); + record.setModifiedTime(stat.getMtime()); + record.setVersion(stat.getVersion()); + } switch (type) { case CURRENTSTATES: @@ -302,6 +310,49 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { } @Override + public HelixProperty.Stat getPropertyStat(PropertyKey key) { + PropertyType type = key.getType(); + String path = key.getPath(); + int options = constructOptions(type); + try { + Stat stat = _baseDataAccessor.getStat(path, options); + if (stat != null) { + return new HelixProperty.Stat(stat.getVersion(), stat.getCtime(), stat.getMtime()); + } + } catch (ZkNoNodeException e) { + + } + + return null; + } + + @Override + public List<HelixProperty.Stat> getPropertyStats(List<PropertyKey> keys) { + if (keys == null || keys.size() == 0) { + return Collections.emptyList(); + } + + List<HelixProperty.Stat> propertyStats = new ArrayList<>(keys.size()); + List<String> paths = new ArrayList<>(keys.size()); + for (PropertyKey key : keys) { + paths.add(key.getPath()); + } + Stat[] zkStats = _baseDataAccessor.getStats(paths, 0); + + for (int i = 0; i < keys.size(); i++) { + Stat zkStat = zkStats[i]; + HelixProperty.Stat propertyStat = null; + if (zkStat != null) { + propertyStat = + new HelixProperty.Stat(zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime()); + } + propertyStats.add(propertyStat); + } + + return propertyStats; + } + + @Override public boolean removeProperty(PropertyKey key) { PropertyType type = key.getType(); String path = key.getPath(); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 619f372..542ac65 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -19,10 +19,13 @@ package org.apache.helix.model; * under the License. */ +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.StateTransitionThrottleConfig; @@ -58,6 +61,7 @@ public class ClusterConfig extends HelixProperty { MAX_OFFLINE_INSTANCES_ALLOWED } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; + private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; /** * Instantiate for a specific cluster @@ -428,6 +432,31 @@ public class ClusterConfig extends HelixProperty { maxConcurrentTaskPerInstance); } + /** + * Get IdealState rules defined in the cluster config. + * + * @return + */ + public Map<String, Map<String, String>> getIdealStateRules() { + Map<String, Map<String, String>> idealStateRuleMap = new HashMap<>(); + + for (String simpleKey : getRecord().getSimpleFields().keySet()) { + if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { + String simpleValue = getRecord().getSimpleField(simpleKey); + String[] rules = simpleValue.split("(?<!\\\\),"); + Map<String, String> singleRule = Maps.newHashMap(); + for (String rule : rules) { + String[] keyValue = rule.split("(?<!\\\\)="); + if (keyValue.length >= 2) { + singleRule.put(keyValue[0], keyValue[1]); + } + } + idealStateRuleMap.put(simpleKey, singleRule); + } + } + return idealStateRuleMap; + } + @Override public int hashCode() { return getId().hashCode(); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java index 2c27dfe..1552d87 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java +++ b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java @@ -38,12 +38,13 @@ import org.apache.log4j.Logger; public class ParticipantHistory extends HelixProperty { private static Logger LOG = Logger.getLogger(ParticipantHistory.class); - private final static int HISTORY_SIZE = 10; + private final static int HISTORY_SIZE = 20; private enum ConfigProperty { TIME, DATE, SESSION, HISTORY, + OFFLINE, LAST_OFFLINE_TIME } @@ -64,6 +65,7 @@ public class ParticipantHistory extends HelixProperty { public void reportOffline() { long time = System.currentTimeMillis(); _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(time)); + updateOfflineHistory(time); } /** @@ -102,7 +104,7 @@ public class ParticipantHistory extends HelixProperty { private void updateSessionHistory(String sessionId) { List<String> list = _record.getListField(ConfigProperty.HISTORY.name()); if (list == null) { - list = new ArrayList<String>(); + list = new ArrayList<>(); _record.setListField(ConfigProperty.HISTORY.name(), list); } @@ -125,6 +127,24 @@ public class ParticipantHistory extends HelixProperty { list.add(sessionEntry.toString()); } + private void updateOfflineHistory(long time) { + List<String> list = _record.getListField(ConfigProperty.OFFLINE.name()); + if (list == null) { + list = new ArrayList<>(); + _record.setListField(ConfigProperty.OFFLINE.name(), list); + } + + if (list.size() == HISTORY_SIZE) { + list.remove(0); + } + + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + String dateTime = df.format(new Date(time)); + + list.add(dateTime); + } + @Override public boolean isValid() { return true; http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java index af6bd81..4e22686 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java @@ -27,7 +27,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import javax.management.JMException; import org.apache.helix.HelixManager; -import org.apache.helix.messaging.handling.HelixTaskExecutor; import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/MockAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java index 445b399..d41c2d3 100644 --- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java @@ -20,11 +20,11 @@ package org.apache.helix; */ import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import org.I0Itec.zkclient.DataUpdater; +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.helix.mock.MockBaseDataAccessor; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; @@ -47,8 +47,6 @@ public class MockAccessor implements HelixDataAccessor { _propertyKeyBuilder = new PropertyKey.Builder(_clusterName); } - Map<String, ZNRecord> map = new HashMap<String, ZNRecord>(); - @Override public boolean createStateModelDef(StateModelDefinition stateModelDef) { return false; @@ -133,6 +131,48 @@ public class MockAccessor implements HelixDataAccessor { } @Override + public HelixProperty.Stat getPropertyStat(PropertyKey key) { + PropertyType type = key.getType(); + String path = key.getPath(); + try { + Stat stat = _baseDataAccessor.getStat(path, 0); + if (stat != null) { + return new HelixProperty.Stat(stat.getVersion(), stat.getCtime(), stat.getMtime()); + } + } catch (ZkNoNodeException e) { + + } + + return null; + } + + @Override + public List<HelixProperty.Stat> getPropertyStats(List<PropertyKey> keys) { + if (keys == null || keys.size() == 0) { + return Collections.emptyList(); + } + + List<HelixProperty.Stat> propertyStats = new ArrayList<>(keys.size()); + List<String> paths = new ArrayList<>(keys.size()); + for (PropertyKey key : keys) { + paths.add(key.getPath()); + } + Stat[] zkStats = _baseDataAccessor.getStats(paths, 0); + + for (int i = 0; i < keys.size(); i++) { + Stat zkStat = zkStats[i]; + HelixProperty.Stat propertyStat = null; + if (zkStat != null) { + propertyStat = + new HelixProperty.Stat(zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime()); + } + propertyStats.add(propertyStat); + } + + return propertyStats; + } + + @Override public List<String> getChildNames(PropertyKey propertyKey) { String path = propertyKey.getPath(); return _baseDataAccessor.getChildNames(path, 0); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterDataCache.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterDataCache.java index 7b7f367..06ac94d 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterDataCache.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterDataCache.java @@ -39,6 +39,8 @@ public class TestClusterDataCache extends TaskTestBase { _driver.start(builder.build()); Thread.sleep(4000); ClusterDataCache cache = new ClusterDataCache("CLUSTER_" + TestHelper.getTestClassName()); + cache.setTaskCache(true); + cache.requireFullRefresh(); cache.refresh(_manager.getHelixDataAccessor()); Assert.assertEquals(cache.getJobConfigMap().size(), 1); Assert.assertEquals(cache.getWorkflowConfigMap().size(), 1); @@ -53,6 +55,7 @@ public class TestClusterDataCache extends TaskTestBase { _driver.start(builder.build()); Thread.sleep(4000); + cache.requireFullRefresh(); cache.refresh(_manager.getHelixDataAccessor()); Assert.assertEquals(cache.getJobConfigMap().size(), 3); Assert.assertEquals(cache.getWorkflowConfigMap().size(), 2); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java index d874fcf..2de3a10 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java @@ -90,7 +90,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); @@ -115,8 +115,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // printHandlers(participantManagerToExpire); int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManagerToExpire.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 9, - "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); + Assert.assertEquals(controllerHandlerNb, 10, + "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant"); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers"); @@ -145,7 +145,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); @@ -226,13 +226,13 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { final MockParticipantManager participantManager = participants[0]; // wait until we get all the listeners registered - result = TestHelper.verify(new TestHelper.Verifier() { + TestHelper.verify(new TestHelper.Verifier() { @Override public boolean verify() throws Exception { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - if (controllerHandlerNb == 9 && particHandlerNb == 2) + if (controllerHandlerNb == 10 && particHandlerNb == 2) return true; else return false; @@ -241,8 +241,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 9, - "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " + Assert.assertEquals(controllerHandlerNb, 10, + "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was " + controllerHandlerNb + ", " + printHandlers(controller)); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was " @@ -273,7 +273,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java index 2eac4e0..94e5e96 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java @@ -82,7 +82,7 @@ public class TestZkConnectionLost extends TaskTestBase { _driver.pollForWorkflowState(scheduledQueue, 10000, TaskState.COMPLETED); } - @Test(dependsOnMethods = { "testLostZkConnection" }) + @Test(dependsOnMethods = { "testLostZkConnection" }, enabled = false) public void testLostZkConnectionNegative() throws Exception { System.setProperty("helixmanager.waitForConnectedTimeout", "10"); @@ -119,10 +119,10 @@ public class TestZkConnectionLost extends TaskTestBase { @Override public void run() { try { Thread.sleep(300); - System.err.println(System.currentTimeMillis() + ": Shutdown ZK server."); + System.out.println(System.currentTimeMillis() + ": Shutdown ZK server."); TestHelper.stopZkServer(_zkServerRef.get()); Thread.sleep(300); - System.err.println("Restart ZK server"); + System.out.println("Restart ZK server"); _zkServerRef.set(TestHelper.startZkServer(ZK_ADDR, null, false)); } catch (Exception e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java new file mode 100644 index 0000000..fee1ee3 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java @@ -0,0 +1,179 @@ +package org.apache.helix.integration.controller; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase { + + @Test() + public void testUpdateOnNotification() throws Exception { + MockZkHelixDataAccessor accessor = + new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + + ClusterDataCache cache = + new ClusterDataCache("CLUSTER_" + TestHelper.getTestClassName()); + cache.refresh(accessor); + + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1); + + accessor.clearReadCounters(); + + // refresh again should read nothing + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); + // cluster config always get reloaded + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); + + accessor.clearReadCounters(); + // refresh again should read only idealstates. + cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); + + accessor.clearReadCounters(); + cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE); + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); + } + + @Test(dependsOnMethods = {"testUpdateOnNotification"}) + public void testSelectiveUpdates() throws Exception { + MockZkHelixDataAccessor accessor = + new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + + ClusterDataCache cache = + new ClusterDataCache("CLUSTER_" + TestHelper.getTestClassName()); + cache.refresh(accessor); + + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1); + + accessor.clearReadCounters(); + + // refresh again should read nothing + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0); + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1); + + // add a new resource + _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_1", _PARTITIONS, STATE_MODEL); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_1", _replica); + + Thread.sleep(100); + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + accessor.clearReadCounters(); + + // refresh again should read only new current states + cache.refresh(accessor); + Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR); + } + + + public class MockZkHelixDataAccessor extends ZKHelixDataAccessor { + Map<PropertyType, Integer> _readPathCounters = new HashMap<>(); + + public MockZkHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) { + super(clusterName, null, baseDataAccessor); + } + + + @Override + public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) { + for (PropertyKey key : keys) { + addCount(key); + } + return super.getProperty(keys); + } + + @Override + public <T extends HelixProperty> T getProperty(PropertyKey key) { + addCount(key); + return super.getProperty(key); + } + + public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) { + Map<String, T> map = super.getChildValuesMap(key); + addCount(key, map.keySet().size()); + return map; + } + + private void addCount(PropertyKey key) { + addCount(key, 1); + } + + private void addCount(PropertyKey key, int count) { + PropertyType type = key.getType(); + if (!_readPathCounters.containsKey(type)) { + _readPathCounters.put(type, 0); + } + _readPathCounters.put(type, _readPathCounters.get(type) + count); + } + + public int getReadCount(PropertyType type) { + if (_readPathCounters.containsKey(type)) { + return _readPathCounters.get(type); + } + + return 0; + } + + public void clearReadCounters() { + _readPathCounters.clear(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java index 158b0d9..c887b38 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java @@ -244,7 +244,7 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase { .assertEquals( handlers.size(), 1, - "Distributed controller should have 1 handler (message) after lose leadership, but was " + "Distributed controller should have 2 handler (message) after lose leadership, but was " + handlers.size()); // clean up http://git-wip-us.apache.org/repos/asf/helix/blob/d6f4943e/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java index 9b3c32f..24d28c2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java @@ -92,10 +92,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); + Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers."); // check participant zk-watchers final MockParticipantManager participantManagerToExpire = participants[0]; @@ -118,7 +118,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // printHandlers(participantManagerToExpire); int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManagerToExpire.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, (5 + 2 * n), + Assert.assertEquals(controllerHandlerNb, (6 + 2 * n), "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers"); @@ -148,10 +148,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { @@ -236,7 +236,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, (5 + 2 * n), + Assert.assertEquals(controllerHandlerNb, (6 + 2 * n), "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " + controllerHandlerNb + ", " + TestHelper.printHandlers(controller)); Assert.assertEquals(particHandlerNb, 1, @@ -269,10 +269,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (6 + 5 * n); + return watchPaths.size() == (7 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() {