This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit a18deb06c28e25e79a9a773069a79ce21a2399b3 Author: Neal Sun <[email protected]> AuthorDate: Tue Dec 8 14:31:12 2020 -0800 Controller-side Task Current State Migration (#1550) First part of task current state migration. All changes made in this PR are on the controller side and are directly pipeline-impacting. --- .../main/java/org/apache/helix/HelixConstants.java | 1 + .../main/java/org/apache/helix/HelixManager.java | 11 ++++ .../main/java/org/apache/helix/PropertyKey.java | 45 +++++++++++++++++ .../java/org/apache/helix/PropertyPathBuilder.java | 26 ++++++++++ .../main/java/org/apache/helix/PropertyType.java | 1 + .../listeners/TaskCurrentStateChangeListener.java} | 40 +++++++-------- .../helix/common/caches/TaskCurrentStateCache.java | 59 ++++++++++++++++++++++ .../helix/controller/GenericHelixController.java | 17 +++++++ .../dataproviders/BaseControllerDataProvider.java | 35 ++++++++++++- .../WorkflowControllerDataProvider.java | 16 +++++- .../helix/controller/stages/ClusterEventType.java | 1 + .../stages/CurrentStateComputationStage.java | 6 +-- .../stages/ResourceComputationStage.java | 5 +- .../apache/helix/manager/zk/CallbackHandler.java | 21 +++++++- .../helix/manager/zk/ParticipantManager.java | 3 ++ .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 2 + .../helix/manager/zk/ZKHelixDataAccessor.java | 4 ++ .../apache/helix/manager/zk/ZKHelixManager.java | 8 +++ .../org/apache/helix/TestPropertyPathBuilder.java | 7 +++ .../test/java/org/apache/helix/TestZKCallback.java | 23 +++++++++ .../java/org/apache/helix/common/ZkTestBase.java | 7 +++ .../waged/model/AbstractTestClusterModel.java | 1 + .../stages/TestCurrentStateComputationStage.java | 46 ++++++++++++++++- .../stages/TestResourceComputationStage.java | 28 ++++++++++ .../integration/TestZkCallbackHandlerLeak.java | 35 ++++++++++--- .../manager/TestParticipantManager.java | 2 +- .../java/org/apache/helix/mock/MockHelixAdmin.java | 2 + 27 files changed, 409 insertions(+), 43 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java index 0fcdfe6..445b32c 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java +++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java @@ -34,6 +34,7 @@ public interface HelixConstants { CLUSTER_CONFIG (PropertyType.CONFIGS), LIVE_INSTANCE (PropertyType.LIVEINSTANCES), CURRENT_STATE (PropertyType.CURRENTSTATES), + TASK_CURRENT_STATE (PropertyType.TASKCURRENTSTATES), CUSTOMIZED_STATE_ROOT (PropertyType.CUSTOMIZEDSTATES), CUSTOMIZED_STATE (PropertyType.CUSTOMIZEDSTATES), MESSAGE (PropertyType.MESSAGES), diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index 06a4ec5..4ce3ff9 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -230,6 +230,17 @@ public interface HelixManager { String sessionId) throws Exception; /** + * Uses CurrentStateChangeListener since TaskCurrentState shares the same CurrentState model + * @see CurrentStateChangeListener#onStateChange(String, List, NotificationContext) + * @param listener + * @param instanceName + */ + default void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener, + String instanceName, String sessionId) throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + + /** * @see CustomizedStateRootChangeListener#onCustomizedStateRootChange(String, NotificationContext) * @param listener diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java index 6437f1a..0427068 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java @@ -74,6 +74,7 @@ import static org.apache.helix.PropertyType.STATEMODELDEFS; import static org.apache.helix.PropertyType.STATUSUPDATES; import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER; import static org.apache.helix.PropertyType.TARGETEXTERNALVIEW; +import static org.apache.helix.PropertyType.TASKCURRENTSTATES; /** @@ -486,6 +487,50 @@ public class PropertyKey { } /** + * Get a property key associated with {@link CurrentState} of an instance and session. This key + * is for TaskCurrentState specifically. + * @param instanceName + * @param sessionId + * @return {@link PropertyKey} + */ + public PropertyKey taskCurrentStates(String instanceName, String sessionId) { + return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName, + sessionId); + } + + /** + * Get a property key associated with {@link CurrentState} of an instance, session, and + * job. This key is for TaskCurrentState specifically. + * @param instanceName + * @param sessionId + * @param jobName + * @return {@link PropertyKey} + */ + public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName) { + return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName, + sessionId, jobName); + } + + /** + * Get a property key associated with {@link CurrentState} of an instance, session, job, + * and bucket name. This key is for TaskCurrentState specifically. + * @param instanceName + * @param sessionId + * @param jobName + * @param bucketName + * @return {@link PropertyKey} + */ + public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName, + String bucketName) { + if (bucketName == null) { + return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName, + sessionId, jobName); + } + return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName, + sessionId, jobName, bucketName); + } + + /** * Get a property key associated with the root of {@link CustomizedState} of an instance * @param instanceName * @return {@link PropertyKey} diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java index d4cec13..49e765d 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java @@ -54,6 +54,7 @@ import static org.apache.helix.PropertyType.MESSAGES; import static org.apache.helix.PropertyType.PAUSE; import static org.apache.helix.PropertyType.STATEMODELDEFS; import static org.apache.helix.PropertyType.STATUSUPDATES; +import static org.apache.helix.PropertyType.TASKCURRENTSTATES; import static org.apache.helix.PropertyType.WORKFLOWCONTEXT; @@ -125,6 +126,14 @@ public class PropertyPathBuilder { "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}"); addEntry(PropertyType.CURRENTSTATES, 5, "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}/{bucketName}"); + addEntry(TASKCURRENTSTATES, 2, + "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES"); + addEntry(TASKCURRENTSTATES, 3, + "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}"); + addEntry(TASKCURRENTSTATES, 4, + "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}"); + addEntry(TASKCURRENTSTATES, 5, + "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}/{bucketName}"); addEntry(PropertyType.CUSTOMIZEDSTATES, 2, "/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES"); addEntry(PropertyType.CUSTOMIZEDSTATES, 3, @@ -346,6 +355,23 @@ public class PropertyPathBuilder { sessionId, resourceName); } + public static String instanceTaskCurrentState(String clusterName, String instanceName) { + return String.format("/%s/INSTANCES/%s/TASKCURRENTSTATES", clusterName, instanceName); + } + + public static String instanceTaskCurrentState(String clusterName, String instanceName, + String sessionId) { + return String + .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s", clusterName, instanceName, sessionId); + } + + public static String instanceTaskCurrentState(String clusterName, String instanceName, + String sessionId, String resourceName) { + return String + .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s/%s", clusterName, instanceName, sessionId, + resourceName); + } + public static String instanceCustomizedState(String clusterName, String instanceName) { return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES", clusterName, instanceName); } diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java index 9ce27f3..bedf79e 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyType.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java @@ -52,6 +52,7 @@ public enum PropertyType { // INSTANCE PROPERTIES MESSAGES(Type.INSTANCE, true, true, true), CURRENTSTATES(Type.INSTANCE, true, true, false, false, true), + TASKCURRENTSTATES(Type.INSTANCE, true, true, false, false, true), STATUSUPDATES(Type.INSTANCE, true, true, false, false, false, true), ERRORS(Type.INSTANCE, true, true), INSTANCE_HISTORY(Type.INSTANCE, true, true, true), diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java similarity index 55% copy from helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java copy to helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java index 4c8a60e..5f00eae 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java +++ b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java @@ -1,4 +1,4 @@ -package org.apache.helix.controller.stages; +package org.apache.helix.api.listeners; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,24 +19,22 @@ package org.apache.helix.controller.stages; * under the License. */ -public enum ClusterEventType { - IdealStateChange, - CurrentStateChange, - CustomizedStateChange, - ConfigChange, - ClusterConfigChange, - ResourceConfigChange, - InstanceConfigChange, - CustomizeStateConfigChange, - LiveInstanceChange, - MessageChange, - ExternalViewChange, - CustomizedViewChange, - TargetExternalViewChange, - Resume, - PeriodicalRebalance, - OnDemandRebalance, - RetryRebalance, - StateVerifier, - Unknown +import java.util.List; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.CurrentState; + +/** + * Interface to implement to respond to changes in task current states + */ +public interface TaskCurrentStateChangeListener { + + /** + * Invoked when task current states change + * @param instanceName name of the instance whose states changed + * @param statesInfo a list of the task current states + * @param changeContext the change event and state + */ + void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo, + NotificationContext changeContext); } diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java new file mode 100644 index 0000000..936f6d0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java @@ -0,0 +1,59 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.controllers.ControlContextProvider; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; + + +/** + * Cache to hold all task CurrentStates of a cluster. + */ +public class TaskCurrentStateCache extends ParticipantStateCache<CurrentState> { + public TaskCurrentStateCache(ControlContextProvider controlContextProvider) { + super(controlContextProvider); + } + + @Override + protected Set<PropertyKey> PopulateParticipantKeys(HelixDataAccessor accessor, + Map<String, LiveInstance> liveInstanceMap) { + Set<PropertyKey> participantStateKeys = new HashSet<>(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + for (String instanceName : liveInstanceMap.keySet()) { + LiveInstance liveInstance = liveInstanceMap.get(instanceName); + String sessionId = liveInstance.getEphemeralOwner(); + List<String> currentStateNames = + accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId)); + for (String currentStateName : currentStateNames) { + participantStateKeys + .add(keyBuilder.taskCurrentState(instanceName, sessionId, currentStateName)); + } + } + return participantStateKeys; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 3673a07..11ca5a3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -58,6 +58,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.api.listeners.ResourceConfigChangeListener; +import org.apache.helix.api.listeners.TaskCurrentStateChangeListener; import org.apache.helix.common.ClusterEventBlockingQueue; import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; @@ -128,6 +129,7 @@ import static org.apache.helix.HelixConstants.ChangeType; */ public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, + TaskCurrentStateChangeListener, CustomizedStateRootChangeListener, CustomizedStateChangeListener, CustomizedStateConfigChangeListener, ControllerChangeListener, @@ -569,6 +571,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns rebalancePipeline); registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.TaskCurrentStateChange, dataRefresh, dataPreprocess, + rebalancePipeline); registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, @@ -905,6 +909,17 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns @Override @PreFetch(enabled = false) + public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo, + NotificationContext changeContext) { + logger.info("START: GenericClusterController.onTaskCurrentStateChange()"); + notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE); + pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections + .<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName)); + logger.info("END: GenericClusterController.onTaskCurrentStateChange()"); + } + + @Override + @PreFetch(enabled = false) public void onCustomizedStateRootChange(String instanceName, List<String> customizedStateTypes, NotificationContext changeContext) { logger.info("START: GenericClusterController.onCustomizedStateRootChange()"); @@ -1243,6 +1258,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns // remove current-state listener for expired session String instanceName = lastSessions.get(session).getInstanceName(); manager.removeListener(keyBuilder.currentStates(instanceName, session), this); + manager.removeListener(keyBuilder.taskCurrentStates(instanceName, session), this); } } } @@ -1264,6 +1280,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns try { // add current-state listeners for new sessions manager.addCurrentStateChangeListener(this, instanceName, session); + manager.addTaskCurrentStateChangeListener(this, instanceName, session); logger.info(manager.getInstanceName() + " added current-state listener for instance: " + instanceName + ", session: " + session + ", listener: " + this); } catch (Exception e) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index c2c9078..070c70a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -44,6 +44,7 @@ import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.CurrentStateCache; import org.apache.helix.common.caches.InstanceMessagesCache; import org.apache.helix.common.caches.PropertyCache; +import org.apache.helix.common.caches.TaskCurrentStateCache; import org.apache.helix.common.controllers.ControlContextProvider; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; @@ -58,6 +59,7 @@ import org.apache.helix.model.Message; import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.task.TaskConstants; import org.apache.helix.util.HelixUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { // Special caches private CurrentStateCache _currentStateCache; + protected TaskCurrentStateCache _taskCurrentStateCache; private InstanceMessagesCache _instanceMessagesCache; // Other miscellaneous caches @@ -226,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { } }, false); _currentStateCache = new CurrentStateCache(this); + _taskCurrentStateCache = new TaskCurrentStateCache(this); _instanceMessagesCache = new InstanceMessagesCache(_clusterName); } @@ -572,13 +576,40 @@ public class BaseControllerDataProvider implements ControlContextProvider { /** * Provides the current state of the node for a given session id, the sessionid can be got from - * LiveInstance + * LiveInstance. This function is only called from the regular pipelines. * @param instanceName * @param clientSessionId * @return */ public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) { - return _currentStateCache.getParticipantState(instanceName, clientSessionId); + return getCurrentState(instanceName, clientSessionId, false); + } + + /** + * Provides the current state of the node for a given session id, the sessionid can be got from + * LiveInstance + * @param instanceName + * @param clientSessionId + * @return + */ + public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId, + boolean isTaskPipeline) { + Map<String, CurrentState> regularCurrentStates = + _currentStateCache.getParticipantState(instanceName, clientSessionId); + if (isTaskPipeline) { + // TODO: Targeted jobs still rely on regular resource current states, so need to include all + // resource current states without filtering. For now, allow regular current states to + // overwrite task current states in case of name conflicts, which are unlikely. Eventually, + // it should be completely split. + Map<String, CurrentState> mergedCurrentStates = new HashMap<>(); + mergedCurrentStates + .putAll(_taskCurrentStateCache.getParticipantState(instanceName, clientSessionId)); + mergedCurrentStates.putAll(regularCurrentStates); + return Collections.unmodifiableMap(mergedCurrentStates); + } + return regularCurrentStates.entrySet().stream().filter( + entry -> !TaskConstants.STATE_MODEL_NAME.equals(entry.getValue().getStateModelDefRef())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java index 45e1319..96894e8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java @@ -26,6 +26,8 @@ import java.util.Set; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.common.caches.TaskCurrentStateCache; +import org.apache.helix.model.CurrentState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.TaskDataCache; @@ -78,14 +80,15 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider { // This check (and set) is necessary for now since the current state flag in // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now. _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false) + || _propertyDataChangedMap.get(HelixConstants.ChangeType.TASK_CURRENT_STATE).getAndSet(false) || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false) - || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE) || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE); } public synchronized void refresh(HelixDataAccessor accessor) { long startTime = System.currentTimeMillis(); Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor); + _taskCurrentStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap()); refreshClusterStateChangeFlags(propertyRefreshed); @@ -252,6 +255,17 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider { return _existsLiveInstanceOrCurrentStateOrMessageChange; } + /** + * For a certain session, return the task current states on the node. + * @param instanceName + * @param clientSessionId + * @return A mapping of resource names to CurrentStates + */ + public Map<String, CurrentState> getTaskCurrentState(String instanceName, + String clientSessionId) { + return _taskCurrentStateCache.getParticipantState(instanceName, clientSessionId); + } + @Override public String toString() { StringBuilder sb = genCacheContentStringBuilder(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java index 4c8a60e..cd0ce60 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java @@ -22,6 +22,7 @@ package org.apache.helix.controller.stages; public enum ClusterEventType { IdealStateChange, CurrentStateChange, + TaskCurrentStateChange, CustomizedStateChange, ConfigChange, ClusterConfigChange, diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 4af7e27..49e5d8f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -89,9 +89,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { String instanceSessionId = instance.getEphemeralOwner(); // update current states. - Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName, - instanceSessionId); - updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap); + updateCurrentStates(instance, + cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline).values(), + currentStateOutput, resourceMap); Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName); // update pending messages diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 7c2cac6..1f77fa6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -190,10 +190,7 @@ public class ResourceComputationStage extends AbstractBaseStage { String clientSessionId = instance.getEphemeralOwner(); Map<String, CurrentState> currentStateMap = - cache.getCurrentState(instanceName, clientSessionId); - if (currentStateMap == null || currentStateMap.size() == 0) { - continue; - } + cache.getCurrentState(instanceName, clientSessionId, isTaskCache); for (CurrentState currentState : currentStateMap.values()) { String resourceName = currentState.getResourceName(); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 03c5f73..24b42af 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -61,6 +61,7 @@ import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; +import org.apache.helix.api.listeners.TaskCurrentStateChangeListener; import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; @@ -102,6 +103,7 @@ import static org.apache.helix.HelixConstants.ChangeType.MESSAGE; import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER; import static org.apache.helix.HelixConstants.ChangeType.RESOURCE_CONFIG; import static org.apache.helix.HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW; +import static org.apache.helix.HelixConstants.ChangeType.TASK_CURRENT_STATE; @PreFetchChangedData(enabled = false) public class CallbackHandler implements IZkChildListener, IZkDataListener { @@ -261,6 +263,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { case CURRENT_STATE: listenerClass = CurrentStateChangeListener.class; break; + case TASK_CURRENT_STATE: + listenerClass = TaskCurrentStateChangeListener.class; + break; case CUSTOMIZED_STATE_ROOT: listenerClass = CustomizedStateRootChangeListener.class; break; @@ -415,6 +420,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { List<CurrentState> currentStates = preFetch(_propertyKey); currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext); + } else if (_changeType == TASK_CURRENT_STATE) { + TaskCurrentStateChangeListener taskCurrentStateChangeListener = + (TaskCurrentStateChangeListener) _listener; + String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path); + List<CurrentState> currentStates = preFetch(_propertyKey); + taskCurrentStateChangeListener + .onTaskCurrentStateChange(instanceName, currentStates, changeContext); + } else if (_changeType == CUSTOMIZED_STATE_ROOT) { CustomizedStateRootChangeListener customizedStateRootChangeListener = (CustomizedStateRootChangeListener) _listener; @@ -527,8 +540,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { _zkClient.unsubscribeChildChanges(path, this); } - // List of children could be empty, but won't be null. - return _zkClient.getChildren(path); + try { + return _zkClient.getChildren(path); + } catch (ZkNoNodeException e) { + return null; + } } private void subscribeDataChange(String path, NotificationContext.Type callbackType) { @@ -571,6 +587,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { try { switch (_changeType) { case CURRENT_STATE: + case TASK_CURRENT_STATE: case CUSTOMIZED_STATE: case IDEAL_STATE: case EXTERNAL_VIEW: diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index 937fb4b..3a58570 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -418,6 +418,9 @@ public class ParticipantManager { String path = _keyBuilder.currentStates(_instanceName, session).getPath(); LOG.info("Removing current states from previous sessions. path: " + path); _zkclient.deleteRecursively(path); + path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath(); + LOG.info("Removing task current states from previous sessions. path: " + path); + _zkclient.deleteRecursively(path); } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 7b946b1..df5abe4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -193,6 +193,8 @@ public class ZKHelixAdmin implements HelixAdmin { _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true); + _zkClient + .createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index 35cc663..203c6c7 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -186,6 +186,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { boolean success = false; switch (type) { case CURRENTSTATES: + case TASKCURRENTSTATES: case CUSTOMIZEDSTATES: success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true); break; @@ -243,6 +244,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { switch (type) { case CURRENTSTATES: + case TASKCURRENTSTATES: case IDEALSTATES: case EXTERNALVIEW: // check if bucketized @@ -301,6 +303,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { switch (type) { case CURRENTSTATES: + case TASKCURRENTSTATES: case IDEALSTATES: case EXTERNALVIEW: // check if bucketized @@ -421,6 +424,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { for (ZNRecord record : children) { switch (type) { case CURRENTSTATES: + case TASKCURRENTSTATES: case IDEALSTATES: case EXTERNALVIEW: if (record != null) { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 05e1e26..04f4385 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -591,6 +591,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } @Override + public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener, + String instanceName, String sessionId) throws Exception { + addListener(listener, new Builder(_clusterName).taskCurrentStates(instanceName, sessionId), + ChangeType.TASK_CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged + }); + } + + @Override public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, String instanceName) throws Exception { addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName), diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java index 1f8df6c..422fb9c 100644 --- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java +++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java @@ -39,6 +39,13 @@ public class TestPropertyPathBuilder { actual = PropertyPathBuilder.instanceCurrentState("test_cluster", "instanceName1", "sessionId"); AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId"); + actual = PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1"); + AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES"); + actual = + PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1", "sessionId"); + AssertJUnit + .assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES/sessionId"); + actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1"); AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CUSTOMIZEDSTATES"); actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1", "customizedState1"); diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java index d97471c..5dc0171 100644 --- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java +++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java @@ -32,6 +32,7 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener; import org.apache.helix.api.listeners.IdealStateChangeListener; import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; +import org.apache.helix.api.listeners.TaskCurrentStateChangeListener; import org.apache.helix.model.CurrentState; import org.apache.helix.model.CustomizedStateConfig; import org.apache.helix.model.ExternalView; @@ -58,6 +59,7 @@ public class TestZKCallback extends ZkUnitTestBase { public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener, ConfigChangeListener, CurrentStateChangeListener, + TaskCurrentStateChangeListener, CustomizedStateConfigChangeListener, CustomizedStateRootChangeListener, ExternalViewChangeListener, @@ -66,6 +68,7 @@ public class TestZKCallback extends ZkUnitTestBase { boolean liveInstanceChangeReceived = false; boolean configChangeReceived = false; boolean currentStateChangeReceived = false; + boolean taskCurrentStateChangeReceived = false; boolean customizedStateConfigChangeReceived = false; boolean customizedStateRootChangeReceived = false; boolean messageChangeReceived = false; @@ -84,6 +87,12 @@ public class TestZKCallback extends ZkUnitTestBase { } @Override + public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo, + NotificationContext changeContext) { + taskCurrentStateChangeReceived = true; + } + + @Override public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) { configChangeReceived = true; } @@ -149,6 +158,8 @@ public class TestZKCallback extends ZkUnitTestBase { testHelixManager.addMessageListener(testListener, "localhost_8900"); testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900", testHelixManager.getSessionId()); + testHelixManager.addTaskCurrentStateChangeListener(testListener, "localhost_8900", + testHelixManager.getSessionId()); testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900"); testHelixManager.addConfigChangeListener(testListener); testHelixManager.addIdealStateChangeListener(testListener); @@ -184,6 +195,18 @@ public class TestZKCallback extends ZkUnitTestBase { Assert.assertTrue(result); testListener.Reset(); + CurrentState taskCurState = new CurrentState("db-12345"); + taskCurState.setSessionId("sessionId"); + taskCurState.setStateModelDefRef("StateModelDef"); + accessor.setProperty(keyBuilder + .taskCurrentState("localhost_8900", testHelixManager.getSessionId(), + taskCurState.getId()), taskCurState); + result = TestHelper.verify(() -> { + return testListener.taskCurrentStateChangeReceived; + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result); + testListener.Reset(); + IdealState idealState = new IdealState("db-1234"); idealState.setNumPartitions(400); idealState.setReplicas(Integer.toString(2)); diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 0b1c375..e832307 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -929,6 +929,13 @@ public class ZkTestBase { LOG.error("Current state not empty for " + participant); return false; } + CurrentState taskCurrentState = + accessor.getProperty(keyBuilder.taskCurrentState(participant, sessionId, _resourceName)); + Map<String, String> taskPartitionStateMap = taskCurrentState.getPartitionStateMap(); + if (taskPartitionStateMap != null && !taskPartitionStateMap.isEmpty()) { + LOG.error("Task current state not empty for " + participant); + return false; + } } } return true; diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index 8887e87..e3b346d 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -154,6 +154,7 @@ public abstract class AbstractTestClusterModel { currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1); currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2); when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap); + when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap); // 5. Set up the resource config for the two resources with the partition weight. Map<String, Integer> capacityDataMapResource1 = new HashMap<>(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java index 9658e2a..91e275a 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java @@ -19,9 +19,11 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.HashMap; import java.util.Map; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.model.CurrentState; @@ -99,11 +101,28 @@ public class TestCurrentStateComputationStage extends BaseStageTest { stateWithDeadSession.setStateModelDefRef("MasterSlave"); stateWithDeadSession.setState("testResourceName_1", "MASTER"); + ZNRecord record3 = new ZNRecord("testTaskResourceName"); + CurrentState taskStateWithLiveSession = new CurrentState(record3); + taskStateWithLiveSession.setSessionId("session_3"); + taskStateWithLiveSession.setStateModelDefRef("Task"); + taskStateWithLiveSession.setState("testTaskResourceName_1", "INIT"); + ZNRecord record4 = new ZNRecord("testTaskResourceName"); + CurrentState taskStateWithDeadSession = new CurrentState(record4); + taskStateWithDeadSession.setSessionId("session_dead"); + taskStateWithDeadSession.setStateModelDefRef("Task"); + taskStateWithDeadSession.setState("testTaskResourceName_1", "INIT"); + accessor.setProperty(keyBuilder.currentState("localhost_3", "session_3", "testResourceName"), stateWithLiveSession); - accessor.setProperty( - keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"), + accessor.setProperty(keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"), stateWithDeadSession); + accessor.setProperty( + keyBuilder.taskCurrentState("localhost_3", "session_3", "testTaskResourceName"), + taskStateWithLiveSession); + accessor.setProperty( + keyBuilder.taskCurrentState("localhost_3", "session_dead", "testTaskResourceName"), + taskStateWithDeadSession); + runStage(event, new ReadClusterDataStage()); runStage(event, stage); CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name()); @@ -111,6 +130,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest { output3.getCurrentState("testResourceName", new Partition("testResourceName_1"), "localhost_3"); AssertJUnit.assertEquals(currentState, "OFFLINE"); + // Non Task Framework event will cause task current states to be ignored + String taskCurrentState = output3 + .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"), + "localhost_3"); + AssertJUnit.assertNull(taskCurrentState); // Add another state transition message which is stale message = new Message(Message.MessageType.STATE_TRANSITION, "msg2"); @@ -128,6 +152,24 @@ public class TestCurrentStateComputationStage extends BaseStageTest { AssertJUnit.assertEquals(dataCache.getStaleMessages().size(), 1); AssertJUnit.assertTrue(dataCache.getStaleMessages().containsKey("localhost_3")); AssertJUnit.assertTrue(dataCache.getStaleMessages().get("localhost_3").containsKey("msg2")); + + // Use a task event to check that task current states are included + resourceMap = new HashMap<String, Resource>(); + Resource testTaskResource = new Resource("testTaskResourceName"); + testTaskResource.setStateModelDefRef("Task"); + testTaskResource.addPartition("testTaskResourceName_1"); + resourceMap.put("testTaskResourceName", testTaskResource); + ClusterEvent taskEvent = new ClusterEvent(ClusterEventType.Unknown); + taskEvent.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + taskEvent.addAttribute(AttributeName.ControllerDataProvider.name(), + new WorkflowControllerDataProvider()); + runStage(taskEvent, new ReadClusterDataStage()); + runStage(taskEvent, stage); + CurrentStateOutput output5 = taskEvent.getAttribute(AttributeName.CURRENT_STATE.name()); + taskCurrentState = output5 + .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"), + "localhost_3"); + AssertJUnit.assertEquals(taskCurrentState, "INIT"); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java index 440543e..6b6f28e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.StageContext; @@ -154,6 +155,14 @@ public class TestResourceComputationStage extends BaseStageTest { accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource), currentState); + String oldTaskResource = "testTaskResourceOld"; + CurrentState taskCurrentState = new CurrentState(oldTaskResource); + taskCurrentState.setState("testTaskResourceOld_0", "RUNNING"); + taskCurrentState.setState("testTaskResourceOld_1", "FINISHED"); + taskCurrentState.setStateModelDefRef("Task"); + accessor.setProperty(keyBuilder.taskCurrentState(instanceName, sessionId, oldTaskResource), + taskCurrentState); + event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); ResourceComputationStage stage = new ResourceComputationStage(); @@ -185,6 +194,25 @@ public class TestResourceComputationStage extends BaseStageTest { AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1")); AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2")); + + event.addAttribute(AttributeName.ControllerDataProvider.name(), + new WorkflowControllerDataProvider()); + runStage(event, new ReadClusterDataStage()); + runStage(event, stage); + + resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); + // +2 because it will have current state and task current state + AssertJUnit.assertEquals(resources.length + 2, resourceMap.size()); + + Resource taskResource = resourceMap.get(oldTaskResource); + AssertJUnit.assertNotNull(taskResource); + AssertJUnit.assertEquals(taskResource.getResourceName(), oldTaskResource); + AssertJUnit + .assertEquals(taskResource.getStateModelDefRef(), taskCurrentState.getStateModelDefRef()); + AssertJUnit.assertEquals(taskResource.getPartitions().size(), + taskCurrentState.getPartitionStateMap().size()); + AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_0")); + AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_1")); } @Test diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java index 2f27d4b..7bc92dc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.AccessOption; import org.apache.helix.CurrentStateChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.NotificationContext; @@ -43,6 +44,7 @@ import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.slf4j.Logger; @@ -62,6 +64,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { String clusterName = className + "_" + methodName; final int n = 2; final int r = 2; + final int taskResourceCount = 2; System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); @@ -78,6 +81,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); // start participants MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { @@ -85,6 +89,13 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); + + // Manually set up task current states + for (int j = 0; j < taskResourceCount; j++) { + _baseAccessor.create(keyBuilder + .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j) + .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT); + } } ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName) @@ -105,7 +116,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // System.out.println("controller watch paths: " + watchPaths); // where n is number of nodes and r is number of resources - return watchPaths.size() == (8 + r + ( 5 + r) * n); + return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n); } }, 2000); Assert.assertTrue(result, "Controller has incorrect number of zk-watchers."); @@ -130,8 +141,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // printHandlers(participantManagerToExpire); int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManagerToExpire.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 14, - "HelixController should have 14 (8+3n) callback handlers for 2 (n) participant"); + Assert.assertEquals(controllerHandlerNb, 8 + 4 * n, + "HelixController should have 16 (8+4n) callback handlers for 2 (n) participant"); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers"); @@ -160,7 +171,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // System.out.println("controller watch paths after session expiry: " + watchPaths); // where n is number of nodes and r is number of resources - return watchPaths.size() == (8 + r + ( 5 + r) * n); + // one participant is disconnected, and its task current states are removed + return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n - 1) + 6 + r); } }, 2000); Assert.assertTrue(result, "Controller has incorrect number of zk-watchers after session expiry."); @@ -208,6 +220,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { String clusterName = className + "_" + methodName; final int n = 2; final int r = 1; + final int taskResourceCount = 1; System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); @@ -224,6 +237,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); // start participants MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { @@ -231,6 +245,12 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); + // Manually set up task current states + for (int j = 0; j < taskResourceCount; j++) { + _baseAccessor.create(keyBuilder + .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j) + .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT); + } } ZkHelixClusterVerifier verifier = @@ -256,8 +276,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 8 + 3 * n, - "HelixController should have 14 (8+3n) callback handlers for 2 participant, but was " + Assert.assertEquals(controllerHandlerNb, 8 + 4 * n, + "HelixController should have 16 (8+4n) callback handlers for 2 participant, but was " + controllerHandlerNb + ", " + printHandlers(controller)); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was " @@ -285,7 +305,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { System.err.println("controller watch paths after session expiry: " + watchPaths.size()); // where r is number of resources and n is number of nodes - int expected = (8 + r + (5 + r) * n); + // task resource count does not attribute to ideal state watch paths + int expected = (8 + r + (6 + r + taskResourceCount) * n); return watchPaths.size() == expected; } }, 2000); diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java index 25523d0..050bc76 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java @@ -210,7 +210,7 @@ public class TestParticipantManager extends ZkTestBase { // check HelixCallback Monitor Set<ObjectInstance> objs = _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null); - Assert.assertEquals(objs.size(), 18); + Assert.assertEquals(objs.size(), 19); // check HelixZkClient Monitors objs = diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index e954c93..42633c1 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -222,6 +222,8 @@ public class MockHelixAdmin implements HelixAdmin { _baseDataAccessor .set(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), new ZNRecord(nodeId), 0); + _baseDataAccessor.set(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), + new ZNRecord(nodeId), 0); _baseDataAccessor .set(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), new ZNRecord(nodeId), 0);
