Repository: helix Updated Branches: refs/heads/master 94ac4253b -> 6e047915d
Support batch add jobs to JobQueue To better support compliance event emition request, we can have an new API to enque jobs with required orders. With this API: 1. Batch added jobs should keep the order of input set. 2. Jobs should be stay in the same bucket. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6e047915 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6e047915 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6e047915 Branch: refs/heads/master Commit: 6e047915d7c4aefb976ca888dcba2b4b684bec90 Parents: 94ac425 Author: Junkai Xue <[email protected]> Authored: Wed Apr 25 13:39:20 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Jun 26 17:53:18 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 119 +++++++++++++------ .../integration/task/TestBatchAddJobs.java | 109 +++++++++++++++++ 2 files changed, 193 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index a628dd3..b04534f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -329,6 +329,20 @@ public class TaskDriver { */ public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) { + enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder)); + } + + /** + * Batch add jobs to queues that garantee + * + * @param queue + * @param jobs + * @param jobBuilders + */ + public void enqueueJobs(final String queue, final List<String> jobs, + final List<JobConfig.Builder> jobBuilders) { + + // Get the job queue config and capacity WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); if (workflowConfig == null) { @@ -357,14 +371,29 @@ public class TaskDriver { } validateZKNodeLimitation(1); - - // Create the job to ensure that it validates - JobConfig jobConfig = jobBuilder.setWorkflow(queue).build(); - final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); - - // add job config first. - addJobConfig(namespacedJobName, jobConfig); - final String jobType = jobConfig.getJobType(); + final List<JobConfig> jobConfigs = new ArrayList<>(); + final List<String> namespacedJobNames = new ArrayList<>(); + final List<String> jobTypeList = new ArrayList<>(); + + try { + for (int i = 0; i < jobBuilders.size(); i++) { + // Create the job to ensure that it validates + JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build(); + String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i)); + + // add job config first. + addJobConfig(namespacedJobName, jobConfig); + jobConfigs.add(jobConfig); + namespacedJobNames.add(namespacedJobName); + jobTypeList.add(jobConfig.getJobType()); + } + } catch (HelixException e) { + LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString()); + for (String job : jobs) { + String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); + TaskUtil.removeJobConfig(_accessor, namespacedJobName); + } + } // update the job dag to append the job to the end of the queue. DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { @@ -374,53 +403,73 @@ public class TaskDriver { JobDag jobDag = JobDag.fromJson( currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); Set<String> allNodes = jobDag.getAllNodes(); - if (capacity > 0 && allNodes.size() >= capacity) { - throw new IllegalStateException( - "Queue " + queue + " already reaches its max capacity, failed to add " + job); + if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) { + throw new IllegalStateException(String + .format("Queue %s already reaches its max capacity %f, failed to add %s", capacity, + queue, jobs.toString())); } - if (allNodes.contains(namespacedJobName)) { - throw new IllegalStateException( - "Could not add to queue " + queue + ", job " + job + " already exists"); - } - jobDag.addNode(namespacedJobName); - - // Add the node to the end of the queue - String candidate = null; - for (String node : allNodes) { - if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) { - candidate = node; - break; + + String lastNodeName = null; + for (int i = 0; i < namespacedJobNames.size(); i++) { + String namespacedJobName = namespacedJobNames.get(i); + if (allNodes.contains(namespacedJobName)) { + throw new IllegalStateException(String + .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i))); + } + jobDag.addNode(namespacedJobName); + + // Add the node to the end of the queue + String candidate = null; + if (lastNodeName == null) { + for (String node : allNodes) { + if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) { + candidate = node; + break; + } + } + } else { + candidate = lastNodeName; + } + if (candidate != null) { + jobDag.addParentToChild(candidate, namespacedJobName); + lastNodeName = namespacedJobName; } - } - if (candidate != null) { - jobDag.addParentToChild(candidate, namespacedJobName); } // Add job type if job type is not null - if (jobType != null) { - Map<String, String> jobTypes = - currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); - if (jobTypes == null) { - jobTypes = new HashMap<String, String>(); + Map<String, String> jobTypes = + currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); + for (String jobType : jobTypeList) { + if (jobType != null) { + if (jobTypes == null) { + jobTypes = new HashMap<>(); + } + jobTypes.put(queue, jobType); } - jobTypes.put(queue, jobType); - currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); } + if (jobTypes != null) { + currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); + } // Save the updated DAG try { currentData .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson()); } catch (Exception e) { - throw new IllegalStateException("Could not add job " + job + " to queue " + queue, - e); + throw new IllegalStateException( + String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e); } return currentData; } }; + String path = _accessor.keyBuilder().resourceConfig(queue).getPath(); boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); if (!status) { + LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString()); + for (String job : jobs) { + TaskUtil.removeJobConfig(_accessor, job); + } throw new HelixException("Failed to enqueue job"); } http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java new file mode 100644 index 0000000..d50845e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java @@ -0,0 +1,109 @@ +package org.apache.helix.integration.task; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobDag; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestBatchAddJobs extends ZkIntegrationTestBase { + private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_TestBatchAddJobs"; + private static final String QUEUE_NAME = "TestBatchAddJobQueue"; + private ClusterSetup _setupTool; + private List<SubmitJobTask> _submitJobTasks; + + @BeforeClass + public void beforeClass() { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + _submitJobTasks = new ArrayList<>(); + + } + + @Test + public void testBatchAddJobs() throws Exception { + TaskDriver driver = new TaskDriver(_gZkClient, CLUSTER_NAME); + driver.createQueue(new JobQueue.Builder(QUEUE_NAME).build()); + for (int i = 0; i < 10; i++) { + _submitJobTasks.add(new SubmitJobTask(ZK_ADDR, i)); + _submitJobTasks.get(i).start(); + } + + WorkflowConfig workflowConfig = driver.getWorkflowConfig(QUEUE_NAME); + while (workflowConfig.getJobDag().getAllNodes().size() < 100) { + Thread.sleep(50); + driver.getWorkflowConfig(QUEUE_NAME); + } + + JobDag dag = workflowConfig.getJobDag(); + String currentJob = dag.getAllNodes().iterator().next(); + while (dag.getDirectChildren(currentJob).size() > 0) { + String childJob = dag.getDirectChildren(currentJob).iterator().next(); + if (!getPrefix(currentJob).equals(getPrefix(childJob)) + && currentJob.charAt(currentJob.length() - 1) != '9') { + Assert.fail(); + } + currentJob = childJob; + } + } + + private String getPrefix(String job) { + return job.split("#")[0]; + } + + @AfterClass + public void afterClass() { + for (SubmitJobTask submitJobTask : _submitJobTasks) { + submitJobTask.interrupt(); + } + } + + static class SubmitJobTask extends Thread { + private TaskDriver _driver; + private String _jobPrefixName; + + public SubmitJobTask(String zkAddress, int index) throws Exception { + HelixManager manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Administrator", InstanceType.ADMINISTRATOR, zkAddress); + manager.connect(); + _driver = new TaskDriver(manager); + _jobPrefixName = "JOB_" + index + "#"; + } + + @Override + public void start() { + List<String> jobNames = new ArrayList<>(); + List<JobConfig.Builder> jobConfigBuilders = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String jobName = _jobPrefixName + i; + jobNames.add(jobName); + jobConfigBuilders.add(new JobConfig.Builder().addTaskConfigs(Collections + .singletonList(new TaskConfig("CMD", null, UUID.randomUUID().toString(), "TARGET")))); + } + + _driver.enqueueJobs(QUEUE_NAME, jobNames, jobConfigBuilders); + } + } +}
