Refactor TaskUtil class to move as many as methods out of the class, and make other methods in it as internal API as possible. Expose necessary APIs in TaskDriver instead.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/579d82fd Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/579d82fd Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/579d82fd Branch: refs/heads/helix-0.6.x Commit: 579d82fd2aa8fdce8ec0e0c4d6da73cb8209729d Parents: 6d42db4 Author: Lei Xia <l...@linkedin.com> Authored: Tue Feb 23 17:32:35 2016 -0800 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 14:44:32 2016 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/JobRebalancer.java | 6 +- .../org/apache/helix/task/TaskRebalancer.java | 36 ++++ .../java/org/apache/helix/task/TaskRunner.java | 37 +++- .../java/org/apache/helix/task/TaskUtil.java | 178 +++---------------- .../apache/helix/task/WorkflowRebalancer.java | 89 +++++++++- .../helix/integration/task/TaskTestUtil.java | 35 ++-- .../task/TestDisableJobExternalView.java | 2 +- .../task/TestIndependentTaskRebalancer.java | 25 ++- .../integration/task/TestRecurringJobQueue.java | 36 ++-- .../task/TestRunJobsWithMissingTarget.java | 11 +- .../integration/task/TestTaskRebalancer.java | 32 ++-- .../task/TestTaskRebalancerFailover.java | 12 +- .../task/TestTaskRebalancerParallel.java | 2 +- .../task/TestTaskRebalancerRetryLimit.java | 4 +- .../task/TestTaskRebalancerStopResume.java | 78 ++++---- .../integration/task/TestUpdateWorkflow.java | 19 +- 16 files changed, 306 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 93d4689..7eeafc7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -90,7 +90,7 @@ public class JobRebalancer extends TaskRebalancer { // The job is already in a final state (completed/failed). if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { LOG.info("Job " + jobName + " is failed or already completed, clean up IS."); - TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); + cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); _scheduledRebalancer.removeScheduledRebalance(jobName); return buildEmptyAssignment(jobName, currStateOutput); } @@ -340,7 +340,7 @@ public class JobRebalancer extends TaskRebalancer { addAllPartitions(allPartitions, partitionsToDropFromIs); // remove IdealState of this job - TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); return buildEmptyAssignment(jobResource, currStateOutput); } else { skippedPartitions.add(pId); @@ -376,7 +376,7 @@ public class JobRebalancer extends TaskRebalancer { if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx); // remove IdealState of this job - TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); } // Make additional task assignments if needed. http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index f35ce69..b006efc 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -29,8 +29,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.stages.ClusterDataCache; @@ -171,6 +173,40 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return (startTime == null || startTime.getTime() <= System.currentTimeMillis()); } + /** + * Cleans up IdealState and external view associated with a job/workflow resource. + */ + protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) { + LOG.info("Cleaning up idealstate and externalView for job: " + resourceName); + + // Delete the ideal state itself. + PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName); + if (accessor.getProperty(isKey) != null) { + if (!accessor.removeProperty(isKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", + resourceName, isKey)); + } + } else { + LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName)); + } + + // Delete dead external view + // because job is already completed, there is no more current state change + // thus dead external views removal will not be triggered + PropertyKey evKey = accessor.keyBuilder().externalView(resourceName); + if (accessor.getProperty(evKey) != null) { + if (!accessor.removeProperty(evKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", + resourceName, evKey)); + } + } + + LOG.info(String + .format("Successfully clean up idealstate/externalView for resource %s.", resourceName)); + } + @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index 7b17043..1bf88ec 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -19,7 +19,10 @@ package org.apache.helix.task; * under the License. */ +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; import org.apache.helix.task.TaskResult.Status; import org.apache.log4j.Logger; @@ -168,8 +171,8 @@ public class TaskRunner implements Runnable { */ private void requestStateTransition(TaskPartitionState state) { boolean success = - TaskUtil.setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId, - _taskName, _taskPartition, state); + setRequestedState(_manager.getHelixDataAccessor(), _instance, _sessionId, _taskName, + _taskPartition, state); if (!success) { LOG.error(String .format( @@ -177,4 +180,34 @@ public class TaskRunner implements Runnable { state, _instance, _sessionId, _taskPartition)); } } + + /** + * Request a state change for a specific task. + * + * @param accessor connected Helix data accessor + * @param instance the instance serving the task + * @param sessionId the current session of the instance + * @param resource the job name + * @param partition the task partition name + * @param state the requested state + * @return true if the request was persisted, false otherwise + */ + private static boolean setRequestedState(HelixDataAccessor accessor, String instance, + String sessionId, String resource, String partition, TaskPartitionState state) { + LOG.debug( + String.format("Requesting a state transition to %s for partition %s.", state, partition)); + try { + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + PropertyKey key = keyBuilder.currentState(instance, sessionId, resource); + CurrentState currStateDelta = new CurrentState(resource); + currStateDelta.setRequestedState(partition, state.name()); + + return accessor.updateProperty(key, currStateDelta); + } catch (Exception e) { + LOG.error(String + .format("Error when requesting a state transition to %s for partition %s.", state, + partition), e); + return false; + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index ca274d0..49622f3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -55,15 +55,17 @@ import com.google.common.collect.Maps; public class TaskUtil { private static final Logger LOG = Logger.getLogger(TaskUtil.class); public static final String CONTEXT_NODE = "Context"; + /** * Parses job resource configurations in Helix into a {@link JobConfig} object. + * This method is internal API, please use TaskDriver.getJobConfig(); * * @param accessor Accessor to access Helix configs * @param jobResource The name of the job resource * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null * otherwise. */ - public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) { + protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) { HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource); if (jobResourceConfig == null) { return null; @@ -83,25 +85,27 @@ public class TaskUtil { /** * Parses job resource configurations in Helix into a {@link JobConfig} object. + * This method is internal API, please use TaskDriver.getJobConfig(); * * @param manager HelixManager object used to connect to Helix. * @param jobResource The name of the job resource. * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null * otherwise. */ - public static JobConfig getJobCfg(HelixManager manager, String jobResource) { + protected static JobConfig getJobCfg(HelixManager manager, String jobResource) { return getJobCfg(manager.getHelixDataAccessor(), jobResource); } /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. + * This method is internal API, please use TaskDriver.getWorkflowConfig(); * * @param accessor Accessor to access Helix configs * @param workflow The name of the workflow. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the * workflow, null otherwise. */ - public static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) { + protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) { HelixProperty workflowCfg = getResourceConfig(accessor, workflow); if (workflowCfg == null) { return null; @@ -115,60 +119,32 @@ public class TaskUtil { /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. + * This method is internal API, please use TaskDriver.getWorkflowConfig(); * * @param manager Helix manager object used to connect to Helix. * @param workflow The name of the workflow resource. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the * workflow, null otherwise. */ - public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) { + protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) { return getWorkflowCfg(manager.getHelixDataAccessor(), workflow); } /** - * Request a state change for a specific task. - * - * @param accessor connected Helix data accessor - * @param instance the instance serving the task - * @param sessionId the current session of the instance - * @param resource the job name - * @param partition the task partition name - * @param state the requested state - * @return true if the request was persisted, false otherwise - */ - public static boolean setRequestedState(HelixDataAccessor accessor, String instance, - String sessionId, String resource, String partition, TaskPartitionState state) { - LOG.debug( - String.format("Requesting a state transition to %s for partition %s.", state, partition)); - try { - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - PropertyKey key = keyBuilder.currentState(instance, sessionId, resource); - CurrentState currStateDelta = new CurrentState(resource); - currStateDelta.setRequestedState(partition, state.name()); - - return accessor.updateProperty(key, currStateDelta); - } catch (Exception e) { - LOG.error(String - .format("Error when requesting a state transition to %s for partition %s.", state, - partition), e); - return false; - } - } - - /** * Get a Helix configuration scope at a resource (i.e. job and workflow) level * * @param clusterName the cluster containing the resource * @param resource the resource name * @return instantiated {@link HelixConfigScope} */ - public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { + protected static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE) .forCluster(clusterName).forResource(resource).build(); } /** - * Get the runtime context of a single job + * Get the runtime context of a single job. + * This method is internal API, please use TaskDriver.getJobContext(); * * @param propertyStore Property store for the cluster * @param jobResource The name of the job @@ -183,31 +159,34 @@ public class TaskUtil { } /** - * Get the runtime context of a single job + * Get the runtime context of a single job. + * This method is internal API, please use TaskDriver.getJobContext(); * * @param manager a connection to Helix * @param jobResource the name of the job * @return the {@link JobContext}, or null if none is available */ - public static JobContext getJobContext(HelixManager manager, String jobResource) { + protected static JobContext getJobContext(HelixManager manager, String jobResource) { return getJobContext(manager.getHelixPropertyStore(), jobResource); } /** * Set the runtime context of a single job + * This method is internal API; * * @param manager a connection to Helix * @param jobResource the name of the job * @param ctx the up-to-date {@link JobContext} for the job */ - public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { + protected static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { manager.getHelixPropertyStore() .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT); } /** - * Get the runtime context of a single workflow + * Get the runtime context of a single workflow. + * This method is internal API, please use TaskDriver.getWorkflowContext(); * * @param propertyStore Property store of the cluster * @param workflowResource The name of the workflow @@ -222,13 +201,14 @@ public class TaskUtil { } /** - * Get the runtime context of a single workflow + * Get the runtime context of a single workflow. + * This method is internal API, please use TaskDriver.getWorkflowContext(); * * @param manager a connection to Helix * @param workflowResource the name of the workflow * @return the {@link WorkflowContext}, or null if none is available */ - public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) { + protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) { return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource); } @@ -367,126 +347,12 @@ public class TaskUtil { return null; } - /** - * Create a new workflow based on an existing one - * - * @param manager connection to Helix - * @param origWorkflowName the name of the existing workflow - * @param newWorkflowName the name of the new workflow - * @param newStartTime a provided start time that deviates from the desired start time - * @return the cloned workflow, or null if there was a problem cloning the existing one - */ - public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName, - String newWorkflowName, Date newStartTime) { - // Read all resources, including the workflow and jobs of interest - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Map<String, HelixProperty> resourceConfigMap = - accessor.getChildValuesMap(keyBuilder.resourceConfigs()); - if (!resourceConfigMap.containsKey(origWorkflowName)) { - LOG.error("No such workflow named " + origWorkflowName); - return null; - } - if (resourceConfigMap.containsKey(newWorkflowName)) { - LOG.error("Workflow with name " + newWorkflowName + " already exists!"); - return null; - } - - // Create a new workflow with a new name - HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName); - Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields(); - JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG)); - Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren(); - Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName); - - // Set the workflow expiry - workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY))); - - // Set the schedule, if applicable - ScheduleConfig scheduleConfig; - if (newStartTime != null) { - scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime); - } else { - scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields); - } - if (scheduleConfig != null) { - workflowBuilder.setScheduleConfig(scheduleConfig); - } - - // Add each job back as long as the original exists - Set<String> namespacedJobs = jobDag.getAllNodes(); - for (String namespacedJob : namespacedJobs) { - if (resourceConfigMap.containsKey(namespacedJob)) { - // Copy over job-level and task-level configs - String job = getDenamespacedJobName(origWorkflowName, namespacedJob); - HelixProperty jobConfig = resourceConfigMap.get(namespacedJob); - Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields(); - - JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields); - - jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name - Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields(); - List<TaskConfig> taskConfigs = Lists.newLinkedList(); - for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) { - TaskConfig taskConfig = TaskConfig.from(rawTaskConfig); - taskConfigs.add(taskConfig); - } - jobCfgBuilder.addTaskConfigs(taskConfigs); - workflowBuilder.addJobConfig(job, jobCfgBuilder); - - // Add dag dependencies - Set<String> children = parentsToChildren.get(namespacedJob); - if (children != null) { - for (String namespacedChild : children) { - String child = getDenamespacedJobName(origWorkflowName, namespacedChild); - workflowBuilder.addParentChildDependency(job, child); - } - } - } - } - return workflowBuilder.build(); - } - private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.resourceConfig(resource)); } /** - * Cleans up IdealState and external view associated with a job/workflow resource. - */ - public static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) { - LOG.info("Cleaning up idealstate and externalView for job: " + resourceName); - - // Delete the ideal state itself. - PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName); - if (accessor.getProperty(isKey) != null) { - if (!accessor.removeProperty(isKey)) { - LOG.error(String.format( - "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", - resourceName, isKey)); - } - } else { - LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName)); - } - - // Delete dead external view - // because job is already completed, there is no more current state change - // thus dead external views removal will not be triggered - PropertyKey evKey = accessor.keyBuilder().externalView(resourceName); - if (accessor.getProperty(evKey) != null) { - if (!accessor.removeProperty(evKey)) { - LOG.error(String.format( - "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", - resourceName, evKey)); - } - } - - LOG.info(String - .format("Successfully clean up idealstate/externalView for resource %s.", resourceName)); - } - - /** * Extracts the partition id from the given partition name. * * @param pName http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index db5426c..05b6dc6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -19,6 +19,7 @@ package org.apache.helix.task; * under the License. */ +import com.google.common.collect.Lists; import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.*; import org.apache.helix.controller.stages.ClusterDataCache; @@ -264,8 +265,8 @@ public class WorkflowRebalancer extends TaskRebalancer { String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule)); LOG.debug("Ready to start workflow " + newWorkflowName); if (!newWorkflowName.equals(lastScheduled)) { - Workflow clonedWf = TaskUtil - .cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule)); + Workflow clonedWf = + cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule)); TaskDriver driver = new TaskDriver(_manager); try { // Start the cloned workflow @@ -298,6 +299,86 @@ public class WorkflowRebalancer extends TaskRebalancer { } /** + * Create a new workflow based on an existing one + * + * @param manager connection to Helix + * @param origWorkflowName the name of the existing workflow + * @param newWorkflowName the name of the new workflow + * @param newStartTime a provided start time that deviates from the desired start time + * @return the cloned workflow, or null if there was a problem cloning the existing one + */ + public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName, + String newWorkflowName, Date newStartTime) { + // Read all resources, including the workflow and jobs of interest + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Map<String, HelixProperty> resourceConfigMap = + accessor.getChildValuesMap(keyBuilder.resourceConfigs()); + if (!resourceConfigMap.containsKey(origWorkflowName)) { + LOG.error("No such workflow named " + origWorkflowName); + return null; + } + if (resourceConfigMap.containsKey(newWorkflowName)) { + LOG.error("Workflow with name " + newWorkflowName + " already exists!"); + return null; + } + + // Create a new workflow with a new name + HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName); + Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields(); + JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG)); + Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren(); + Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName); + + // Set the workflow expiry + workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY))); + + // Set the schedule, if applicable + ScheduleConfig scheduleConfig; + if (newStartTime != null) { + scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime); + } else { + scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields); + } + if (scheduleConfig != null) { + workflowBuilder.setScheduleConfig(scheduleConfig); + } + + // Add each job back as long as the original exists + Set<String> namespacedJobs = jobDag.getAllNodes(); + for (String namespacedJob : namespacedJobs) { + if (resourceConfigMap.containsKey(namespacedJob)) { + // Copy over job-level and task-level configs + String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob); + HelixProperty jobConfig = resourceConfigMap.get(namespacedJob); + Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields(); + + JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields); + + jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name + Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields(); + List<TaskConfig> taskConfigs = Lists.newLinkedList(); + for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) { + TaskConfig taskConfig = TaskConfig.from(rawTaskConfig); + taskConfigs.add(taskConfig); + } + jobCfgBuilder.addTaskConfigs(taskConfigs); + workflowBuilder.addJobConfig(job, jobCfgBuilder); + + // Add dag dependencies + Set<String> children = parentsToChildren.get(namespacedJob); + if (children != null) { + for (String namespacedChild : children) { + String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild); + workflowBuilder.addParentChildDependency(job, child); + } + } + } + } + return workflowBuilder.build(); + } + + /** * Cleans up workflow configs and workflow contexts associated with this workflow, * including all job-level configs and context, plus workflow-level information. */ @@ -319,7 +400,7 @@ public class WorkflowRebalancer extends TaskRebalancer { // clean up workflow-level info if this was the last in workflow if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) { // clean up IS & EV - TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow); + cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow); // delete workflow config PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); @@ -354,7 +435,7 @@ public class WorkflowRebalancer extends TaskRebalancer { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); // Remove any idealstate and externalView. - TaskUtil.cleanupIdealStateExtView(accessor, job); + cleanupIdealStateExtView(accessor, job); // Remove DAG references in workflow PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 06b9751..9796497 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.TestHelper; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; @@ -53,7 +54,7 @@ public class TaskTestUtil { * @param workflowResource Resource to poll for completeness * @throws InterruptedException */ - public static void pollForWorkflowState(HelixManager manager, String workflowResource, + public static void pollForWorkflowState(TaskDriver driver, String workflowResource, TaskState... targetStates) throws InterruptedException { // Wait for completion. long st = System.currentTimeMillis(); @@ -61,7 +62,7 @@ public class TaskTestUtil { Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates)); do { Thread.sleep(100); - ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + ctx = driver.getWorkflowContext(workflowResource); } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + _default_timeout); @@ -73,23 +74,23 @@ public class TaskTestUtil { /** * poll for job until it is at either state in targetStates. - * @param manager + * @param driver * @param workflowResource * @param jobName * @param targetStates * @throws InterruptedException */ - public static void pollForJobState(HelixManager manager, String workflowResource, String jobName, + public static void pollForJobState(TaskDriver driver, String workflowResource, String jobName, TaskState... targetStates) throws InterruptedException { // Get workflow config - WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource); + WorkflowConfig wfCfg = driver.getWorkflowConfig(workflowResource); Assert.assertNotNull(wfCfg); WorkflowContext ctx; if (wfCfg.isRecurring()) { // if it's recurring, need to reconstruct workflow and job name do { Thread.sleep(100); - ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + ctx = driver.getWorkflowContext(workflowResource); } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null)); Assert.assertNotNull(ctx); Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow()); @@ -103,7 +104,7 @@ public class TaskTestUtil { long st = System.currentTimeMillis(); do { Thread.sleep(100); - ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + ctx = driver.getWorkflowContext(workflowResource); } while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains( ctx.getJobState(jobName))) @@ -114,27 +115,27 @@ public class TaskTestUtil { "expect job states: " + allowedStates + " actual job state: " + jobState); } - public static void pollForEmptyJobState(final HelixManager manager, final String workflowName, + public static void pollForEmptyJobState(final TaskDriver driver, final String workflowName, final String jobName) throws Exception { final String namespacedJobName = String.format("%s_%s", workflowName, jobName); boolean succeed = TestHelper.verify(new TestHelper.Verifier() { @Override public boolean verify() throws Exception { - WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName); + WorkflowContext ctx = driver.getWorkflowContext(workflowName); return ctx == null || ctx.getJobState(namespacedJobName) == null; } }, _default_timeout); Assert.assertTrue(succeed); } - public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource) + public static WorkflowContext pollForWorkflowContext(TaskDriver driver, String workflowResource) throws InterruptedException { // Wait for completion. long st = System.currentTimeMillis(); WorkflowContext ctx; do { - ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + ctx = driver.getWorkflowContext(workflowResource); Thread.sleep(100); } while (ctx == null && System.currentTimeMillis() < st + _default_timeout); Assert.assertNotNull(ctx); @@ -143,15 +144,15 @@ public class TaskTestUtil { // 1. Different jobs in a same work flow is in RUNNING at the same time // 2. No two jobs in the same work flow is in RUNNING at the same instance - public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName) + public static boolean pollForWorkflowParallelState(TaskDriver driver, String workflowName) throws InterruptedException { - WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName); + WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName); Assert.assertNotNull(workflowConfig); WorkflowContext workflowContext = null; while (workflowContext == null) { - workflowContext = TaskUtil.getWorkflowContext(manager, workflowName); + workflowContext = driver.getWorkflowContext(workflowName); Thread.sleep(100); } @@ -162,7 +163,7 @@ public class TaskTestUtil { finished = true; int runningCount = 0; - workflowContext = TaskUtil.getWorkflowContext(manager, workflowName); + workflowContext = driver.getWorkflowContext(workflowName); for (String jobName : workflowConfig.getJobDag().getAllNodes()) { TaskState jobState = workflowContext.getJobState(jobName); if (jobState == TaskState.IN_PROGRESS) { @@ -177,9 +178,9 @@ public class TaskTestUtil { List<JobContext> jobContextList = new ArrayList<JobContext>(); for (String jobName : workflowConfig.getJobDag().getAllNodes()) { - JobContext jobContext = TaskUtil.getJobContext(manager, jobName); + JobContext jobContext = driver.getJobContext(jobName); if (jobContext != null) { - jobContextList.add(TaskUtil.getJobContext(manager, jobName)); + jobContextList.add(driver.getJobContext(jobName)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java index b23e268..f673f7b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java @@ -175,7 +175,7 @@ public class TestDisableJobExternalView extends ZkIntegrationTestBase { // ensure all jobs are completed String namedSpaceJob3 = String.format("%s_%s", queueName, "job3"); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob3, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob3, TaskState.COMPLETED); Set<String> seenExternalViews = externviewChecker.getSeenExternalViews(); String namedSpaceJob1 = String.format("%s_%s", queueName, "job1"); http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index ba8367e..1c58776 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -45,7 +45,6 @@ import org.apache.helix.task.TaskResult; import org.apache.helix.task.TaskResult.Status; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskStateModelFactory; -import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; @@ -157,8 +156,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure that each class was invoked Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); @@ -184,8 +183,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure that each class was invoked Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); @@ -213,8 +212,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure that each class was invoked Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); @@ -241,8 +240,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure that the class was invoked Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); @@ -276,13 +275,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure that the class was invoked Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); // Check that the workflow only started after the start time (with a 1 second buffer) - WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, jobName); + WorkflowContext workflowCtx = _driver.getWorkflowContext(jobName); long startTime = workflowCtx.getStartTime(); Assert.assertTrue((startTime + 1000) >= inFiveSeconds); } @@ -308,10 +307,10 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure completion - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); // Ensure a single retry happened - JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName); + JobContext jobCtx = _driver.getJobContext(jobName + "_" + jobName); Assert.assertEquals(jobCtx.getPartitionNumAttempts(0), 2); Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay); } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index d83c5eb..ae3d52d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -179,13 +179,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { _driver.start(queueBuild.build()); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); // ensure job 1 is started before stop it String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0)); TaskTestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS); + .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS); _driver.stop(queueName); _driver.delete(queueName); @@ -208,18 +208,18 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { _driver.createQueue(queueBuilder.build()); - wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); // ensure jobs are started and completed scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0)); TaskTestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED); + .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED); scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1)); TaskTestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED); + .pollForJobState(_driver, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED); } @Test @@ -249,21 +249,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { } _driver.createQueue(queueBuilder.build()); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); // ensure job 1 is started before deleting it String deletedJob1 = currentJobNames.get(0); String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1); TaskTestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS, + .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS, TaskState.COMPLETED); // stop the queue LOG.info("Pausing job-queue: " + scheduledQueue); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED); // delete the in-progress job (job 1) and verify it being deleted _driver.deleteJob(queueName, deletedJob1); @@ -274,21 +274,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { _driver.resume(queueName); // ensure job 2 is started - TaskTestUtil.pollForJobState(_manager, scheduledQueue, + TaskTestUtil.pollForJobState(_driver, scheduledQueue, String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS, TaskState.COMPLETED); // stop the queue LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, + TaskTestUtil.pollForJobState(_driver, scheduledQueue, String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED); // Ensure job 3 is not started before deleting it String deletedJob2 = currentJobNames.get(2); String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2); - TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2); + TaskTestUtil.pollForEmptyJobState(_driver, scheduledQueue, namedSpaceDeletedJob2); // delete not-started job (job 3) and verify it being deleted _driver.deleteJob(queueName, deletedJob2); @@ -304,9 +304,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { long preJobFinish = 0; for (int i = 0; i < currentJobNames.size(); i++) { String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i)); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); - JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + JobContext jobContext = _driver.getJobContext(namedSpaceJobName); long jobStart = jobContext.getStartTime(); Assert.assertTrue(jobStart >= preJobFinish); preJobFinish = jobContext.getFinishTime(); @@ -349,12 +349,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { String currentLastJob = jobNames.get(JOB_COUNTS - 2); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); // ensure all jobs are finished String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); // enqueue the last job LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1)); @@ -389,7 +389,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName))); Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName))); - TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName); + TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName); } } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java index 9fd7735..101604b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java @@ -153,7 +153,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { List<String> currentJobNames = new ArrayList<String>(); for (int i = 0; i < num_dbs; i++) { JobConfig.Builder jobConfig = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource( + _test_dbs.get(i)) .setTargetPartitionStates(Sets.newHashSet("SLAVE")); String jobName = "job" + _test_dbs.get(i); queueBuilder.enqueueJob(jobName, jobConfig); @@ -164,8 +165,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2)); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED); - TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); } @Test @@ -190,7 +191,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0)); String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0)); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED); - TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); } } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java index 2d11f85..3a5b179 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java @@ -168,7 +168,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { .setExpiry(expiry).build(); _driver.start(flow); - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS); // Running workflow should have config and context viewable through accessor HelixDataAccessor accessor = _manager.getHelixDataAccessor(); @@ -182,7 +182,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null); // Wait for job to finish and expire - TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED); Thread.sleep(expiry); TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName()); Thread.sleep(expiry); @@ -212,10 +212,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { _driver.start(flow); // Wait for job completion - TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED); // Ensure all partitions are completed individually - JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource)); + JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)); for (int i = 0; i < NUM_PARTITIONS; i++) { Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED); Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1); @@ -239,13 +239,13 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { _driver.start(flow); // wait for job completeness/timeout - TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED); // see if resulting context completed successfully for our partition set String namespacedName = TaskUtil.getNamespacedJobName(jobResource); - JobContext ctx = TaskUtil.getJobContext(_manager, namespacedName); - WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, jobResource); + JobContext ctx = _driver.getJobContext(namespacedName); + WorkflowContext workflowContext = _driver.getWorkflowContext(jobResource); Assert.assertNotNull(ctx); Assert.assertNotNull(workflowContext); Assert.assertEquals(workflowContext.getJobState(namespacedName), TaskState.COMPLETED); @@ -264,11 +264,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { new TaskDriver(_manager).start(flow); // Wait until the workflow completes - TaskTestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, workflowName, TaskState.COMPLETED); // Assert completion for all tasks within two minutes for (String task : flow.getJobConfigs().keySet()) { - TaskTestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, workflowName, task, TaskState.COMPLETED); } } @@ -284,10 +284,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { _driver.start(flow); // Wait until the job reports failure. - TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.FAILED); // Check that all partitions timed out up to maxAttempts - JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource)); + JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)); int maxAttempts = 0; for (int i = 0; i < NUM_PARTITIONS; i++) { TaskPartitionState state = ctx.getPartitionState(i); @@ -322,10 +322,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { // Ensure successful completion String namespacedJob1 = queueName + "_masterJob"; String namespacedJob2 = queueName + "_slaveJob"; - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); - JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1); - JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED); + JobContext masterJobContext = _driver.getJobContext(namespacedJob1); + JobContext slaveJobContext = _driver.getJobContext(namespacedJob2); // Ensure correct ordering long job1Finish = masterJobContext.getFinishTime(); @@ -340,7 +340,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1))); Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2))); Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2))); - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); + WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName); JobDag dag = workflowCfg.getJobDag(); Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1)); Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2)); http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java index a778dcd..8051b2f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java @@ -151,13 +151,13 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase { // check all tasks completed on MASTER String namespacedJob1 = String.format("%s_%s", queueName, job1Name); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); ExternalView ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB)); - JobContext ctx = TaskUtil.getJobContext(_manager, namespacedJob1); + JobContext ctx = _driver.getJobContext(namespacedJob1); Set<String> failOverPartitions = Sets.newHashSet(); for (int p = 0; p < _p; p++) { String instanceName = ctx.getAssignedParticipant(p); @@ -178,12 +178,12 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase { LOG.info("Enqueuing job: " + job2Name); _driver.enqueueJob(queueName, job2Name, job); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.IN_PROGRESS); _participants[0].syncStop(); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED); // tasks previously assigned to localhost_12918 should be re-scheduled on new master - ctx = TaskUtil.getJobContext(_manager, namespacedJob2); + ctx = _driver.getJobContext(namespacedJob2); ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB)); for (int p = 0; p < _p; p++) { String partitionName = ctx.getTargetForPartition(p); @@ -204,7 +204,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase { Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1))); Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2))); Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2))); - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); + WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName); JobDag dag = workflowCfg.getJobDag(); Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1)); Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2)); http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java index c9a0445..580f5ac 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java @@ -158,6 +158,6 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase { _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i)); } - Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_manager, queueName)); + Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_driver, queueName)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java index 8fec899..e576304 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java @@ -138,9 +138,9 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase { _driver.start(flow); // Wait until the job completes. - TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED); - JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource)); + JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)); for (int i = 0; i < _p; i++) { TaskPartitionState state = ctx.getPartitionState(i); if (state != null) { http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 7a8d305..30cb460 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -168,15 +168,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { LOG.info("Starting flow " + flow.getName()); _driver.start(flow); - TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.IN_PROGRESS); LOG.info("Pausing job"); _driver.stop(JOB_RESOURCE); - TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.STOPPED); LOG.info("Resuming job"); _driver.resume(JOB_RESOURCE); - TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.COMPLETED); } @Test @@ -186,15 +186,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { LOG.info("Starting flow " + workflow); _driver.start(flow); - TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.IN_PROGRESS); LOG.info("Pausing workflow"); _driver.stop(workflow); - TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.STOPPED); LOG.info("Resuming workflow"); _driver.resume(workflow); - TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.COMPLETED); } @Test @@ -224,27 +224,27 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { _driver.enqueueJob(queueName, job2Name, job2); String namespacedJob1 = String.format("%s_%s", queueName, job1Name); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.IN_PROGRESS); // stop job1 LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED); // Ensure job2 is not started TimeUnit.MILLISECONDS.sleep(200); String namespacedJob2 = String.format("%s_%s", queueName, job2Name); - TaskTestUtil.pollForEmptyJobState(_manager, queueName, job2Name); + TaskTestUtil.pollForEmptyJobState(_driver, queueName, job2Name); LOG.info("Resuming job-queue: " + queueName); _driver.resume(queueName); // Ensure successful completion - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); - JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1); - JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED); + JobContext masterJobContext = _driver.getJobContext(namespacedJob1); + JobContext slaveJobContext = _driver.getJobContext(namespacedJob2); // Ensure correct ordering long job1Finish = masterJobContext.getFinishTime(); @@ -290,13 +290,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // ensure job 1 is started before deleting it String deletedJob1 = currentJobNames.get(0); String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); // stop the queue LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED); // delete the in-progress job (job 1) and verify it being deleted _driver.deleteJob(queueName, deletedJob1); @@ -307,19 +307,19 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // ensure job 2 is started TaskTestUtil - .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS); + .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS); // stop the queue LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); TaskTestUtil - .pollForJobState(_manager, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED); // Ensure job 3 is not started before deleting it String deletedJob2 = currentJobNames.get(2); String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2); - TaskTestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2); + TaskTestUtil.pollForEmptyJobState(_driver, queueName, namedSpaceDeletedJob2); // delete not-started job (job 3) and verify it being deleted _driver.deleteJob(queueName, deletedJob2); @@ -343,9 +343,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { long preJobFinish = 0; for (int i = 0; i < currentJobNames.size(); i++) { String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i)); - TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJobName, TaskState.COMPLETED); - JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + JobContext jobContext = _driver.getJobContext(namedSpaceJobName); long jobStart = jobContext.getStartTime(); Assert.assertTrue(jobStart >= preJobFinish); preJobFinish = jobContext.getFinishTime(); @@ -391,20 +391,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { _driver.createQueue(queueBuilder.build()); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); // ensure job 1 is started before deleting it String deletedJob1 = currentJobNames.get(0); String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1); TaskTestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); + .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); // stop the queue LOG.info("Pausing job-queue: " + scheduledQueue); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED); // delete the in-progress job (job 1) and verify it being deleted _driver.deleteJob(queueName, deletedJob1); @@ -415,20 +415,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { _driver.resume(queueName); // ensure job 2 is started - TaskTestUtil.pollForJobState(_manager, scheduledQueue, + TaskTestUtil.pollForJobState(_driver, scheduledQueue, String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS); // stop the queue LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, + TaskTestUtil.pollForJobState(_driver, scheduledQueue, String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED); - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED); // Ensure job 3 is not started before deleting it String deletedJob2 = currentJobNames.get(2); String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2); - TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2); + TaskTestUtil.pollForEmptyJobState(_driver, scheduledQueue, namedSpaceDeletedJob2); // delete not-started job (job 3) and verify it being deleted _driver.deleteJob(queueName, deletedJob2); @@ -444,9 +444,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { long preJobFinish = 0; for (int i = 0; i < currentJobNames.size(); i++) { String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i)); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); - JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + JobContext jobContext = _driver.getJobContext(namedSpaceJobName); long jobStart = jobContext.getStartTime(); Assert.assertTrue(jobStart >= preJobFinish); preJobFinish = jobContext.getFinishTime(); @@ -487,12 +487,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { String currentLastJob = jobNames.get(JOB_COUNTS - 2); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); // ensure all jobs are finished String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob); - TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); // enqueue the last job LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1)); @@ -535,10 +535,10 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { _driver.enqueueJob(queueName, job2Name, job2); String namespacedJob1 = String.format("%s_%s", queueName, job1Name); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED); String namespacedJob2 = String.format("%s_%s", queueName, job2Name); - TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); + TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED); // Stop queue _driver.stop(queueName); @@ -588,11 +588,11 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName))); Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName))); - TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName); + TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName); } private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) { - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); + WorkflowConfig workflowCfg = _driver.getWorkflowConfig(queueName); JobDag dag = workflowCfg.getJobDag(); Assert.assertFalse(dag.getAllNodes().contains(namedSpacedJobName)); Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName)); http://git-wip-us.apache.org/repos/asf/helix/blob/579d82fd/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java index fc93392..964f9e1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java @@ -19,13 +19,10 @@ package org.apache.helix.integration.task; * under the License. */ -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; -import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.integration.ZkIntegrationTestBase; @@ -35,7 +32,6 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; -import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; import org.apache.helix.task.ScheduleConfig; import org.apache.helix.task.Task; @@ -44,8 +40,6 @@ import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskStateModelFactory; -import org.apache.helix.task.TaskUtil; -import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; @@ -58,7 +52,6 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Calendar; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -180,9 +173,9 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase { _driver.start(queueBuild.build()); - WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); - WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_manager, queueName); + WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName); WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig); Calendar startTime = Calendar.getInstance(); @@ -195,18 +188,18 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase { // ensure current schedule is started String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.IN_PROGRESS); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS); _driver.updateWorkflow(queueName, configBuilder.build()); // ensure current schedule is completed - TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.COMPLETED); + TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED); Thread.sleep(1000); - wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName); + wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); - WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, scheduledQueue); + WorkflowConfig wCfg = _driver.getWorkflowConfig(scheduledQueue); Calendar configStartTime = Calendar.getInstance(); configStartTime.setTime(wCfg.getStartTime());