More cleanup on workflow and workflowConfig builders.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1f683b86 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1f683b86 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1f683b86 Branch: refs/heads/helix-0.6.x Commit: 1f683b863df23f16bd893fc675f88ed8b7f3d3b8 Parents: b6b89de Author: Lei Xia <l...@linkedin.com> Authored: Wed Mar 30 13:59:59 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 14:58:18 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 40 ++++++++++++++++ .../java/org/apache/helix/task/Workflow.java | 50 ++------------------ .../org/apache/helix/task/WorkflowConfig.java | 11 +++++ .../apache/helix/task/WorkflowRebalancer.java | 2 +- .../org/apache/helix/task/beans/JobBean.java | 4 +- .../task/TestIndependentTaskRebalancer.java | 12 ++--- .../integration/task/WorkflowGenerator.java | 4 +- 7 files changed, 67 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index d423d38..4d5aa94 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -30,7 +30,10 @@ import java.util.Set; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.helix.task.beans.JobBean; +import org.apache.helix.task.beans.TaskBean; /** * Provides a typed interface to job configurations. @@ -485,6 +488,43 @@ public class JobConfig { } } + public static Builder from(JobBean jobBean) { + Builder b = new Builder(); + + b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask) + .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask) + .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance) + .setTimeoutPerTask(jobBean.timeoutPerPartition) + .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay) + .setDisableExternalView(jobBean.disableExternalView) + .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure); + + if (jobBean.jobCommandConfigMap != null) { + b.setJobCommandConfigMap(jobBean.jobCommandConfigMap); + } + if (jobBean.command != null) { + b.setCommand(jobBean.command); + } + if (jobBean.targetResource != null) { + b.setTargetResource(jobBean.targetResource); + } + if (jobBean.targetPartitionStates != null) { + b.setTargetPartitionStates(new HashSet<String>(jobBean.targetPartitionStates)); + } + if (jobBean.targetPartitions != null) { + b.setTargetPartitions(jobBean.targetPartitions); + } + if (jobBean.tasks != null) { + List<TaskConfig> taskConfigs = Lists.newArrayList(); + for (TaskBean task : jobBean.tasks) { + taskConfigs.add(TaskConfig.Builder.from(task)); + } + b.addTaskConfigs(taskConfigs); + } + + return b; + } + private static List<String> csvToStringList(String csv) { String[] vals = csv.split(","); return Arrays.asList(vals); http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index e077f47..a7060c3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -34,14 +34,10 @@ import java.util.TreeMap; import org.apache.helix.HelixException; import org.apache.helix.task.beans.JobBean; -import org.apache.helix.task.beans.TaskBean; import org.apache.helix.task.beans.WorkflowBean; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - /** * Houses a job dag and config set to fully describe a job workflow */ @@ -150,56 +146,18 @@ public class Workflow { if (job.name == null) { throw new IllegalArgumentException("A job must have a name."); } - + JobConfig.Builder jobConfigBuilder = JobConfig.Builder.from(job); + jobConfigBuilder.setWorkflow(wf.name); + workflowBuilder.addJob(job.name, jobConfigBuilder); if (job.parents != null) { for (String parent : job.parents) { workflowBuilder.addParentChildDependency(parent, job.name); } } - - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.WorkflowID.name(), wf.name); - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.Command.name(), job.command); - if (job.jobConfigMap != null) { - workflowBuilder.addJobCommandConfigMap(job.name, job.jobConfigMap); - } - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetResource.name(), - job.targetResource); - if (job.targetPartitionStates != null) { - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitionStates.name(), - Joiner.on(",").join(job.targetPartitionStates)); - } - if (job.targetPartitions != null) { - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitions.name(), - Joiner.on(",").join(job.targetPartitions)); - } - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.MaxAttemptsPerTask.name(), - String.valueOf(job.maxAttemptsPerTask)); - workflowBuilder.addConfig(job.name, - JobConfig.JobConfigProperty.MaxForcedReassignmentsPerTask.name(), - String.valueOf(job.maxForcedReassignmentsPerTask)); - workflowBuilder.addConfig(job.name, - JobConfig.JobConfigProperty.ConcurrentTasksPerInstance.name(), - String.valueOf(job.numConcurrentTasksPerInstance)); - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TimeoutPerPartition.name(), - String.valueOf(job.timeoutPerPartition)); - workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.FailureThreshold.name(), - String.valueOf(job.failureThreshold)); - if (job.tasks != null) { - List<TaskConfig> taskConfigs = Lists.newArrayList(); - for (TaskBean task : job.tasks) { - taskConfigs.add(TaskConfig.Builder.from(task)); - } - workflowBuilder.addTaskConfigs(job.name, taskConfigs); - } } } - WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(); - if (wf.schedule != null) { - workflowCfgBuilder.setScheduleConfig(ScheduleConfig.from(wf.schedule)); - } - workflowCfgBuilder.setExpiry(wf.expiry); - workflowBuilder.setWorkflowConfig(workflowCfgBuilder.build()); + workflowBuilder.setWorkflowConfig(WorkflowConfig.Builder.from(wf).build()); return workflowBuilder.build(); } http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index db9fdba..844bdf0 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixException; +import org.apache.helix.task.beans.WorkflowBean; import org.apache.log4j.Logger; /** @@ -377,6 +378,16 @@ public class WorkflowConfig { return _taskDag; } + public static Builder from(WorkflowBean workflowBean) { + WorkflowConfig.Builder b = new WorkflowConfig.Builder(); + if (workflowBean.schedule != null) { + b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule)); + } + b.setExpiry(workflowBean.expiry); + + return b; + } + private void validate() { if (_expiry < 0) { throw new IllegalArgumentException(String http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 8f97cce..2d4ca75 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -372,7 +372,7 @@ public class WorkflowRebalancer extends TaskRebalancer { taskConfigs.add(taskConfig); } jobCfgBuilder.addTaskConfigs(taskConfigs); - workflowBuilder.addJobConfig(job, jobCfgBuilder); + workflowBuilder.addJob(job, jobCfgBuilder); // Add dag dependencies Set<String> children = parentsToChildren.get(namespacedJob); http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index 32fd5ac..a570026 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -34,7 +34,7 @@ public class JobBean { public List<String> targetPartitionStates; public List<String> targetPartitions; public String command; - public Map<String, String> jobConfigMap; + public Map<String, String> jobCommandConfigMap; public List<TaskBean> tasks; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; @@ -42,4 +42,6 @@ public class JobBean { public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD; public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY; + public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW; + public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; } http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 1c58776..046281e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -152,7 +152,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -179,7 +179,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); _driver.start(workflowBuilder.build()); // Ensure the job completes @@ -207,7 +207,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); _driver.start(workflowBuilder.build()); @@ -235,7 +235,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); _driver.start(workflowBuilder.build()); @@ -268,7 +268,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") .addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); long inFiveSeconds = System.currentTimeMillis() + (5 * 1000); workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds))); @@ -301,7 +301,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs) .setJobCommandConfigMap(jobCommandMap); - workflowBuilder.addJobConfig(jobName, jobBuilder); + workflowBuilder.addJob(jobName, jobBuilder); SingleFailTask.hasFailed = false; _driver.start(workflowBuilder.build()); http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java index 639cdff..d428460 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java @@ -70,8 +70,8 @@ public class WorkflowGenerator { JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG); jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG); - builder.addJobConfig(JOB_NAME_1, jobBuilder); - builder.addJobConfig(JOB_NAME_2, jobBuilder); + builder.addJob(JOB_NAME_1, jobBuilder); + builder.addJob(JOB_NAME_2, jobBuilder); return builder; }