[HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c490fa Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c490fa Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c490fa Branch: refs/heads/helix-0.6.x Commit: 79c490fab080494b68a1f52845c1e708b8881439 Parents: 2409601 Author: Lei Xia <l...@linkedin.com> Authored: Wed Feb 10 15:59:37 2016 -0800 Committer: Lei Xia <l...@linkedin.com> Committed: Wed Apr 13 10:43:23 2016 -0700 ---------------------------------------------------------------------- .../org/apache/helix/model/ResourceConfig.java | 61 +++++ .../java/org/apache/helix/task/JobConfig.java | 258 +++++++++++-------- .../java/org/apache/helix/task/TaskDriver.java | 1 + .../java/org/apache/helix/task/TaskUtil.java | 21 +- .../java/org/apache/helix/task/Workflow.java | 37 +-- .../task/TestIndependentTaskRebalancer.java | 93 +++---- .../integration/task/TestRecurringJobQueue.java | 4 +- .../integration/task/TestTaskRebalancer.java | 39 +-- .../task/TestTaskRebalancerRetryLimit.java | 18 +- .../task/TestTaskRebalancerStopResume.java | 10 +- .../integration/task/WorkflowGenerator.java | 56 ++-- 11 files changed, 354 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index 98433f5..d58126d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -23,6 +23,9 @@ import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.log4j.Logger; +import java.util.Collections; +import java.util.Map; + /** * Resource configurations */ @@ -73,6 +76,64 @@ public class ResourceConfig extends HelixProperty { .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled); } + /** + * Put a set of simple configs. + * + * @param configsMap + */ + public void putSimpleConfigs(Map<String, String> configsMap) { + getRecord().getSimpleFields().putAll(configsMap); + } + + /** + * Get all simple configurations. + * + * @return all simple configurations. + */ + public Map<String, String> getSimpleConfigs() { + return Collections.unmodifiableMap(getRecord().getSimpleFields()); + } + + /** + * Put a single simple config value. + * + * @param configKey + * @param configVal + */ + public void putSimpleConfig(String configKey, String configVal) { + getRecord().getSimpleFields().put(configKey, configVal); + } + + /** + * Get a single simple config value. + * + * @param configKey + * @return configuration value, or NULL if not exist. + */ + public String getSimpleConfig(String configKey) { + return getRecord().getSimpleFields().get(configKey); + } + + /** + * Put a single map config. + * + * @param configKey + * @param configValMap + */ + public void putMapConfig(String configKey, Map<String, String> configValMap) { + getRecord().setMapField(configKey, configValMap); + } + + /** + * Get a single map config. + * + * @param configKey + * @return configuration value map, or NULL if not exist. + */ + public Map<String, String> getMapConfig(String configKey) { + return getRecord().getMapField(configKey); + } + @Override public boolean equals(Object obj) { if (obj instanceof ResourceConfig) { http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index c7c2f38..37a2f35 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -36,49 +36,87 @@ import com.google.common.collect.Maps; * Provides a typed interface to job configurations. */ public class JobConfig { - // // Property names //// - - /** The name of the workflow to which the job belongs. */ - public static final String WORKFLOW_ID = "WorkflowID"; - /** The assignment strategy of this job */ - public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy"; - /** The name of the target resource. */ - public static final String TARGET_RESOURCE = "TargetResource"; - /** - * The set of the target partition states. The value must be a comma-separated list of partition - * states. - */ - public static final String TARGET_PARTITION_STATES = "TargetPartitionStates"; + /** - * The set of the target partition ids. The value must be a comma-separated list of partition ids. + * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder. */ - public static final String TARGET_PARTITIONS = "TargetPartitions"; - /** The command that is to be run by participants in the case of identical tasks. */ - public static final String COMMAND = "Command"; - /** The command configuration to be used by the tasks. */ - public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig"; - /** The timeout for a task. */ - public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition"; - /** The maximum number of times the task rebalancer may attempt to execute a task. */ - public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask"; - /** The maximum number of times Helix will intentionally move a failing task */ - public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask"; - /** The number of concurrent tasks that are allowed to run on an instance. */ - public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance"; - /** The number of tasks within the job that are allowed to fail. */ - public static final String FAILURE_THRESHOLD = "FailureThreshold"; - /** The amount of time in ms to wait before retrying a task */ - public static final String TASK_RETRY_DELAY = "TaskRetryDelay"; - - /** The individual task configurations, if any **/ - public static final String TASK_CONFIGS = "TaskConfigs"; - - /** Disable external view (not showing) for this job resource */ - public static final String DISABLE_EXTERNALVIEW = "DisableExternalView"; - - - // // Default property values //// + protected enum JobConfigProperty { + /** + * The name of the workflow to which the job belongs. + */ + WORKFLOW_ID("WorkflowID"), + /** + * The assignment strategy of this job + */ + ASSIGNMENT_STRATEGY("AssignmentStrategy"), + /** + * The name of the target resource. + */ + TARGET_RESOURCE("TargetResource"), + /** + * The set of the target partition states. The value must be a comma-separated list of partition + * states. + */ + TARGET_PARTITION_STATES("TargetPartitionStates"), + /** + * The set of the target partition ids. The value must be a comma-separated list of partition ids. + */ + TARGET_PARTITIONS("TargetPartitions"), + /** + * The command that is to be run by participants in the case of identical tasks. + */ + COMMAND("Command"), + /** + * The command configuration to be used by the tasks. + */ + JOB_COMMAND_CONFIG_MAP("JobCommandConfig"), + /** + * The timeout for a task. + */ + TIMEOUT_PER_TASK("TimeoutPerPartition"), + /** + * The maximum number of times the task rebalancer may attempt to execute a task. + */ + MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"), + /** + * The maximum number of times Helix will intentionally move a failing task + */ + MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"), + /** + * The number of concurrent tasks that are allowed to run on an instance. + */ + NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"), + /** + * The number of tasks within the job that are allowed to fail. + */ + FAILURE_THRESHOLD("FailureThreshold"), + /** + * The amount of time in ms to wait before retrying a task + */ + TASK_RETRY_DELAY("TaskRetryDelay"), + + /** + * The individual task configurations, if any * + */ + TASK_CONFIGS("TaskConfigs"), + + /** + * Disable external view (not showing) for this job resource + */ + DISABLE_EXTERNALVIEW("DisableExternalView"); + + private final String _value; + + private JobConfigProperty(String val) { + _value = val; + } + + public String value() { + return _value; + } + } + //Default property values public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; @@ -106,8 +144,7 @@ public class JobConfig { Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, - boolean disableExternalView, - Map<String, TaskConfig> taskConfigMap) { + boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) { _workflow = workflow; _targetResource = targetResource; _targetPartitions = targetPartitions; @@ -190,34 +227,39 @@ public class JobConfig { public Map<String, String> getResourceConfigMap() { Map<String, String> cfgMap = new HashMap<String, String>(); - cfgMap.put(JobConfig.WORKFLOW_ID, _workflow); + cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow); if (_command != null) { - cfgMap.put(JobConfig.COMMAND, _command); + cfgMap.put(JobConfigProperty.COMMAND.value(), _command); } if (_jobCommandConfigMap != null) { String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap); if (serializedConfig != null) { - cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig); + cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig); } } if (_targetResource != null) { - cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource); + cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource); } if (_targetPartitionStates != null) { - cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates)); + cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(), + Joiner.on(",").join(_targetPartitionStates)); } if (_targetPartitions != null) { - cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions)); + cfgMap + .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions)); } if (_retryDelay > 0) { - cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay); + cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay); } - cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask); - cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask); - cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask); - cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold); - cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView)); - cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance); + cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask); + cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask); + cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(), + "" + _maxForcedReassignmentsPerTask); + cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold); + cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(), + Boolean.toString(_disableExternalView)); + cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(), + "" + _numConcurrentTasksPerInstance); return cfgMap; } @@ -251,54 +293,58 @@ public class JobConfig { /** * Convenience method to build a {@link JobConfig} from a {@code Map<String, String>}. + * * @param cfg A map of property names to their string representations. * @return A {@link Builder}. */ public static Builder fromMap(Map<String, String> cfg) { Builder b = new Builder(); - if (cfg.containsKey(WORKFLOW_ID)) { - b.setWorkflow(cfg.get(WORKFLOW_ID)); + if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) { + b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value())); } - if (cfg.containsKey(TARGET_RESOURCE)) { - b.setTargetResource(cfg.get(TARGET_RESOURCE)); + if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) { + b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value())); } - if (cfg.containsKey(TARGET_PARTITIONS)) { - b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS))); + if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) { + b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value()))); } - if (cfg.containsKey(TARGET_PARTITION_STATES)) { - b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get( - TARGET_PARTITION_STATES).split(",")))); + if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) { + b.setTargetPartitionStates(new HashSet<String>( + Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(",")))); } - if (cfg.containsKey(COMMAND)) { - b.setCommand(cfg.get(COMMAND)); + if (cfg.containsKey(JobConfigProperty.COMMAND.value())) { + b.setCommand(cfg.get(JobConfigProperty.COMMAND.value())); } - if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) { - Map<String, String> commandConfigMap = - TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP)); + if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) { + Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap( + cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())); b.setJobCommandConfigMap(commandConfigMap); } - if (cfg.containsKey(TIMEOUT_PER_TASK)) { - b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK))); + if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) { + b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value()))); } - if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) { - b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg - .get(NUM_CONCURRENT_TASKS_PER_INSTANCE))); + if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) { + b.setNumConcurrentTasksPerInstance( + Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value()))); } - if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) { - b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK))); + if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) { + b.setMaxAttemptsPerTask( + Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value()))); } - if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) { - b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg - .get(MAX_FORCED_REASSIGNMENTS_PER_TASK))); + if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) { + b.setMaxForcedReassignmentsPerTask( + Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value()))); } - if (cfg.containsKey(FAILURE_THRESHOLD)) { - b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD))); + if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) { + b.setFailureThreshold( + Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value()))); } - if (cfg.containsKey(TASK_RETRY_DELAY)) { - b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY))); + if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) { + b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value()))); } - if (cfg.containsKey(DISABLE_EXTERNALVIEW)) { - b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW))); + if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) { + b.setDisableExternalView( + Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value()))); } return b; } @@ -384,38 +430,46 @@ public class JobConfig { private void validate() { if (_taskConfigMap.isEmpty() && _targetResource == null) { - throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE)); + throw new IllegalArgumentException( + String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE)); } - if (_taskConfigMap.isEmpty() && _targetPartitionStates != null - && _targetPartitionStates.isEmpty()) { - throw new IllegalArgumentException(String.format("%s cannot be an empty set", - TARGET_PARTITION_STATES)); + if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates + .isEmpty()) { + throw new IllegalArgumentException( + String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES)); } if (_taskConfigMap.isEmpty() && _command == null) { - throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND)); + throw new IllegalArgumentException( + String.format("%s cannot be null", JobConfigProperty.COMMAND)); } if (_timeoutPerTask < 0) { - throw new IllegalArgumentException(String.format("%s has invalid value %s", - TIMEOUT_PER_TASK, _timeoutPerTask)); + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK, + _timeoutPerTask)); } if (_numConcurrentTasksPerInstance < 1) { - throw new IllegalArgumentException(String.format("%s has invalid value %s", - NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance)); + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE, + _numConcurrentTasksPerInstance)); } if (_maxAttemptsPerTask < 1) { - throw new IllegalArgumentException(String.format("%s has invalid value %s", - MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask)); + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK, + _maxAttemptsPerTask)); } if (_maxForcedReassignmentsPerTask < 0) { - throw new IllegalArgumentException(String.format("%s has invalid value %s", - MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask)); + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK, + _maxForcedReassignmentsPerTask)); } if (_failureThreshold < 0) { - throw new IllegalArgumentException(String.format("%s has invalid value %s", - FAILURE_THRESHOLD, _failureThreshold)); + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD, + _failureThreshold)); } if (_workflow == null) { - throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID)); + throw new IllegalArgumentException( + String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 9b64aec..c4986ee 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -55,6 +55,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.IdealState; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index d804fab..524b889 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -402,10 +402,10 @@ public class TaskUtil { Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields(); JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG)); Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren(); - Workflow.Builder builder = new Workflow.Builder(newWorkflowName); + Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName); // Set the workflow expiry - builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY))); + workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY))); // Set the schedule, if applicable ScheduleConfig scheduleConfig; @@ -415,7 +415,7 @@ public class TaskUtil { scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields); } if (scheduleConfig != null) { - builder.setScheduleConfig(scheduleConfig); + workflowBuilder.setScheduleConfig(scheduleConfig); } // Add each job back as long as the original exists @@ -426,29 +426,30 @@ public class TaskUtil { String job = getDenamespacedJobName(origWorkflowName, namespacedJob); HelixProperty jobConfig = resourceConfigMap.get(namespacedJob); Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields(); - jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name - for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) { - builder.addConfig(job, e.getKey(), e.getValue()); - } + + JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields); + + jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields(); List<TaskConfig> taskConfigs = Lists.newLinkedList(); for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) { TaskConfig taskConfig = TaskConfig.from(rawTaskConfig); taskConfigs.add(taskConfig); } - builder.addTaskConfigs(job, taskConfigs); + jobCfgBuilder.addTaskConfigs(taskConfigs); + workflowBuilder.addJobConfig(job, jobCfgBuilder); // Add dag dependencies Set<String> children = parentsToChildren.get(namespacedJob); if (children != null) { for (String namespacedChild : children) { String child = getDenamespacedJobName(origWorkflowName, namespacedChild); - builder.addParentChildDependency(job, child); + workflowBuilder.addParentChildDependency(job, child); } } } } - return builder.build(); + return workflowBuilder.build(); } private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor, http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 8ea2691..3a050c2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -128,7 +128,9 @@ public class Workflow { return parse(new StringReader(yaml)); } - /** Helper function to parse workflow from a generic {@link Reader} */ + /** + * Helper function to parse workflow from a generic {@link Reader} + */ private static Workflow parse(Reader reader) throws Exception { Yaml yaml = new Yaml(new Constructor(WorkflowBean.class)); WorkflowBean wf = (WorkflowBean) yaml.load(reader); @@ -146,29 +148,32 @@ public class Workflow { } } - builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name); - builder.addConfig(job.name, JobConfig.COMMAND, job.command); + builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name); + builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command); if (job.jobConfigMap != null) { builder.addJobCommandConfigMap(job.name, job.jobConfigMap); } - builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource); + builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(), + job.targetResource); if (job.targetPartitionStates != null) { - builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES, + builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(), Joiner.on(",").join(job.targetPartitionStates)); } if (job.targetPartitions != null) { - builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS, + builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(job.targetPartitions)); } - builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK, + builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), String.valueOf(job.maxAttemptsPerTask)); - builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, + builder.addConfig(job.name, + JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(), String.valueOf(job.maxForcedReassignmentsPerTask)); - builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, + builder.addConfig(job.name, + JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(), String.valueOf(job.numConcurrentTasksPerInstance)); - builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK, + builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(), String.valueOf(job.timeoutPerPartition)); - builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD, + builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(), String.valueOf(job.failureThreshold)); if (job.tasks != null) { List<TaskConfig> taskConfigs = Lists.newArrayList(); @@ -242,7 +247,7 @@ public class Workflow { _expiry = -1; } - public Builder addConfig(String job, String key, String val) { + private Builder addConfig(String job, String key, String val) { job = namespacify(job); _dag.addNode(job); if (!_jobConfigs.containsKey(job)) { @@ -252,8 +257,8 @@ public class Workflow { return this; } - public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) { - return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP, + private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) { + return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), TaskUtil.serializeJobCommandConfigMap(jobConfigMap)); } @@ -268,7 +273,7 @@ public class Workflow { return this; } - public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) { + private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) { job = namespacify(job); _dag.addNode(job); if (!_taskConfigs.containsKey(job)) { @@ -322,7 +327,7 @@ public class Workflow { protected WorkflowConfig.Builder buildWorkflowConfig() { for (String task : _jobConfigs.keySet()) { // addConfig(task, TaskConfig.WORKFLOW_ID, _name); - _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name); + _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name); } WorkflowConfig.Builder builder; http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index a00a736..40c2485 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -140,8 +140,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _runCounts.clear(); } - @Test - public void testDifferentTasks() throws Exception { + @Test public void testDifferentTasks() throws Exception { // Create a job with two different tasks String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); @@ -150,11 +149,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true); taskConfigs.add(taskConfig1); taskConfigs.add(taskConfig2); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - Map<String, String> jobConfigMap = Maps.newHashMap(); - jobConfigMap.put("Timeout", "1000"); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -166,8 +166,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } - @Test - public void testThresholdFailure() throws Exception { + @Test public void testThresholdFailure() throws Exception { // Create a job with two different tasks String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); @@ -177,12 +176,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false); taskConfigs.add(taskConfig1); taskConfigs.add(taskConfig2); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1); Map<String, String> jobConfigMap = Maps.newHashMap(); jobConfigMap.put("Timeout", "1000"); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1) + .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -194,8 +193,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } - @Test - public void testOptionalTaskFailure() throws Exception { + @Test public void testOptionalTaskFailure() throws Exception { // Create a job with two different tasks String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); @@ -205,11 +203,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false); taskConfigs.add(taskConfig1); taskConfigs.add(taskConfig2); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - Map<String, String> jobConfigMap = Maps.newHashMap(); - jobConfigMap.put("Timeout", "1000"); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); + _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -221,24 +222,23 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } - @Test - public void testReassignment() throws Exception { + @Test public void testReassignment() throws Exception { final int NUM_INSTANCES = 2; String jobName = TestHelper.getTestMethodName(); Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2); - Map<String, String> taskConfigMap = - Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' - + START_PORT)); + Map<String, String> taskConfigMap = Maps.newHashMap( + ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT)); TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); taskConfigs.add(taskConfig1); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" - + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance - Map<String, String> jobConfigMap = Maps.newHashMap(); - jobConfigMap.put("Timeout", "1000"); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") + .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); + _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -251,8 +251,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { // Ensure that this was tried on two different instances, the first of which exhausted the // attempts number, and the other passes on the first try Assert.assertEquals(_runCounts.size(), NUM_INSTANCES); - Assert.assertTrue(_runCounts.values().contains( - JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES)); + Assert.assertTrue( + _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES)); Assert.assertTrue(_runCounts.values().contains(1)); } @@ -264,11 +264,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Map<String, String> taskConfigMap = Maps.newHashMap(); TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); taskConfigs.add(taskConfig1); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - Map<String, String> jobConfigMap = Maps.newHashMap(); - jobConfigMap.put("Timeout", "1000"); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") + .addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); + long inFiveSeconds = System.currentTimeMillis() + (5 * 1000); workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds))); _driver.start(workflowBuilder.build()); @@ -295,11 +298,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Map<String, String> taskConfigMap = Maps.newHashMap(); TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false); taskConfigs.add(taskConfig1); - workflowBuilder.addTaskConfigs(jobName, taskConfigs); - workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); - workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay)); - Map<String, String> jobConfigMap = Maps.newHashMap(); - workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + Map<String, String> jobCommandMap = Maps.newHashMap(); + + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") + .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJobConfig(jobName, jobBuilder); + SingleFailTask.hasFailed = false; _driver.start(workflowBuilder.build()); http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index 79adcd5..da13ada 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -215,12 +215,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { for (int i = 0; i <= 1; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; - JobConfig.Builder job = + JobConfig.Builder jobConfig = new JobConfig.Builder().setCommand("Reindex") .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) .setTargetPartitionStates(Sets.newHashSet(targetPartition)); String jobName = targetPartition.toLowerCase() + "Job" + i; - queueBuild.enqueueJob(jobName, job); + queueBuild.enqueueJob(jobName, jobConfig); currentJobNames.add(jobName); } http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java index f402b82..3352d1c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java @@ -163,9 +163,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { String jobName = "Expiry"; long expiry = 1000; Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100)); - Workflow flow = - WorkflowGenerator - .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig) + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(commandConfig); + + Workflow flow = WorkflowGenerator + .generateSingleJobWorkflowBuilder(jobName, jobBuilder) .setExpiry(expiry).build(); _driver.start(flow); @@ -204,9 +206,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { final String jobResource = "basic" + jobCompletionTime; Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime)); + + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(commandConfig); + Workflow flow = - WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource, - commandConfig).build(); + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); _driver.start(flow); // Wait for job completion @@ -220,18 +225,20 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { } } - @Test - public void partitionSet() throws Exception { + @Test public void partitionSet() throws Exception { final String jobResource = "partitionSet"; ImmutableList<String> targetPartitions = ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13"); // construct and submit our basic workflow Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100)); + + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1) + .setTargetPartitions(targetPartitions); + Workflow flow = - WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource, - commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1), - JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build(); + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); _driver.start(flow); // wait for job completeness/timeout @@ -268,13 +275,15 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { } } - @Test - public void timeouts() throws Exception { + @Test public void timeouts() throws Exception { final String jobResource = "timeouts"; + + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG) + .setMaxAttemptsPerTask(2).setTimeoutPerTask(100); + Workflow flow = - WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource, - WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK, - String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build(); + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); _driver.start(flow); // Wait until the job reports failure. http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java index b678d7e..efe90b0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java @@ -125,18 +125,15 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase { _manager.disconnect(); } - @Test - public void test() throws Exception { + @Test public void test() throws Exception { String jobResource = TestHelper.getTestMethodName(); + + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG) + .setMaxAttemptsPerTask(2).setCommand("ErrorTask").setFailureThreshold(Integer.MAX_VALUE); + Workflow flow = - WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource, - WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK, - String.valueOf(2)).build(); - Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs(); - for (Map<String, String> jobConfig : jobConfigs.values()) { - jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); - jobConfig.put(JobConfig.COMMAND, "ErrorTask"); - } + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); _driver.start(flow); @@ -151,7 +148,6 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase { Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2); } } - } private static class ErrorTask implements Task { http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 8a44672..7437b72 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -162,12 +162,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { _manager.disconnect(); } - @Test - public void stopAndResume() throws Exception { + @Test public void stopAndResume() throws Exception { Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100)); + + JobConfig.Builder jobBuilder = + JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(commandConfig); Workflow flow = - WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE, - commandConfig).build(); + WorkflowGenerator.generateSingleJobWorkflowBuilder(JOB_RESOURCE, jobBuilder).build(); LOG.info("Starting flow " + flow.getName()); _driver.start(flow); http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java index a414f5c..23c35af 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java @@ -56,58 +56,34 @@ public class WorkflowGenerator { DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap); } - public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs( - String jobName, Map<String, String> commandConfig, String... cfgs) { - if (cfgs.length % 2 != 0) { - throw new IllegalArgumentException( - "Additional configs should have even number of keys and values"); - } - Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG); - for (int i = 0; i < cfgs.length; i += 2) { - bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]); - } - - return bldr; + private static final JobConfig.Builder DEFAULT_JOB_BUILDER; + static { + JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG); + builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG); + DEFAULT_JOB_BUILDER = builder; } public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) { - return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG); + return generateSingleJobWorkflowBuilder(jobName, jobBuilder); } public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName, - Map<String, String> commandConfig, Map<String, String> config) { - Workflow.Builder builder = new Workflow.Builder(jobName); - for (String key : config.keySet()) { - builder.addConfig(jobName, key, config.get(key)); - } - if (commandConfig != null) { - ObjectMapper mapper = new ObjectMapper(); - try { - String serializedMap = mapper.writeValueAsString(commandConfig); - builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap); - } catch (IOException e) { - LOG.error("Error serializing " + commandConfig, e); - } - } - return builder; + JobConfig.Builder jobBuilder) { + return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder); } public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) { Workflow.Builder builder = new Workflow.Builder(workflowName); builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2); - for (String key : DEFAULT_JOB_CONFIG.keySet()) { - builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key)); - builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key)); - } - ObjectMapper mapper = new ObjectMapper(); - try { - String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG); - builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap); - builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap); - } catch (IOException e) { - LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e); - } + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG); + + builder.addJobConfig(JOB_NAME_1, jobBuilder); + builder.addJobConfig(JOB_NAME_2, jobBuilder); + return builder; } }