This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit a8e2cf7b76a802a5b005d32082699d4c9e39875a Author: Hunter Lee <[email protected]> AuthorDate: Fri Mar 29 12:08:07 2019 -0700 Task Framework code style change This diff includes style changes using Java 8 features. RB=1613441 BUG=HELIX-1742 G=helix-reviewers A=jxue Signed-off-by: Hunter Lee <[email protected]> --- .../apache/helix/task/AbstractTaskDispatcher.java | 110 ++++----- .../java/org/apache/helix/task/JobDispatcher.java | 21 +- .../java/org/apache/helix/task/TaskDriver.java | 263 ++++++++++----------- .../org/apache/helix/task/WorkflowDispatcher.java | 25 +- .../integration/task/TestQuotaBasedScheduling.java | 45 ++-- .../helix/integration/task/TestTaskRebalancer.java | 27 +-- 6 files changed, 224 insertions(+), 267 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 78a7419..698730e 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -79,7 +79,7 @@ public abstract class AbstractTaskDispatcher { // Iterate through all instances for (String instance : prevInstanceToTaskAssignments.keySet()) { - assignedPartitions.put(instance, new HashSet<Integer>()); + assignedPartitions.put(instance, new HashSet<>()); // Set all dropping transitions first. These are tasks coming from Participant disconnects // that have some active current state (INIT or RUNNING) and the requestedState of DROPPED. @@ -709,7 +709,7 @@ public abstract class AbstractTaskDispatcher { private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, Set<Integer> excluded, Set<Integer> throttled, int n) { - List<Integer> result = new ArrayList<Integer>(); + List<Integer> result = new ArrayList<>(); for (Integer pId : candidatePartitions) { if (!excluded.contains(pId)) { if (result.size() < n) { @@ -932,8 +932,8 @@ public abstract class AbstractTaskDispatcher { TaskConfig taskConfig = taskEntry.getValue(); for (String assignableInstanceName : assignableInstanceManager .getAssignableInstanceNames()) { - assignableInstanceManager - .release(assignableInstanceName, taskConfig, quotaType); + assignableInstanceManager.release(assignableInstanceName, taskConfig, + quotaType); } } } @@ -1120,9 +1120,9 @@ public abstract class AbstractTaskDispatcher { } if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) { - LOG.info(String - .format("Job %s not ready to schedule due to not having enough quota for quota type %s", - job, quotaType)); + LOG.info(String.format( + "Job %s not ready to schedule due to not having enough quota for quota type %s", job, + quotaType)); return false; } @@ -1210,9 +1210,8 @@ public abstract class AbstractTaskDispatcher { LOG.debug( String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName)); } else { - LOG.warn(String - .format("Failed to find runtime job DAG for workflow %s and job %s", workflowName, - jobName)); + LOG.warn(String.format("Failed to find runtime job DAG for workflow %s and job %s", + workflowName, jobName)); } } @@ -1228,21 +1227,18 @@ public abstract class AbstractTaskDispatcher { protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider, final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, final JobConfig jobConfig, final long currentTimestamp) { - AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override - public Object call() { - // Asynchronously update the appropriate JobMonitor - JobMonitor jobMonitor = clusterStatusMonitor - .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); - if (jobMonitor == null) { - return null; - } - - // Compute SubmissionToProcessDelay - long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); - jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay); + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { return null; } + + // Compute SubmissionToProcessDelay + long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); + jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay); + return null; }); } @@ -1258,21 +1254,18 @@ public abstract class AbstractTaskDispatcher { private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider, final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, final JobConfig jobConfig, final long currentTimestamp) { - AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override - public Object call() { - // Asynchronously update the appropriate JobMonitor - JobMonitor jobMonitor = clusterStatusMonitor - .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); - if (jobMonitor == null) { - return null; - } - - // Compute SubmissionToScheduleDelay - long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); - jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay); + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { return null; } + + // Compute SubmissionToScheduleDelay + long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); + jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay); + return null; }); } @@ -1288,32 +1281,29 @@ public abstract class AbstractTaskDispatcher { private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider, final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, final JobConfig jobConfig, final long currentTimestamp) { - AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override - public Object call() { - // Asynchronously update the appropriate JobMonitor - JobMonitor jobMonitor = clusterStatusMonitor - .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); - if (jobMonitor == null) { - return null; - } - - // Compute ControllerInducedDelay only if the workload is a test load - // NOTE: this metric cannot be computed for general user-submitted workloads because - // the actual runtime of the tasks vary, and there could exist multiple tasks per - // job - // NOTE: a test workload will have the "latency" field in the mapField of the - // JobConfig (taskConfig) - String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next(); - if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) { - long taskDuration = - Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG)); - long controllerInducedDelay = - currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration; - jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay); - } + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { return null; } + + // Compute ControllerInducedDelay only if the workload is a test load + // NOTE: this metric cannot be computed for general user-submitted workloads because + // the actual runtime of the tasks vary, and there could exist multiple tasks per + // job + // NOTE: a test workload will have the "latency" field in the mapField of the + // JobConfig (taskConfig) + String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next(); + if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) { + long taskDuration = + Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG)); + long controllerInducedDelay = + currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration; + jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay); + } + return null; }); } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 56e04ac..c8cf09a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -29,10 +29,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.Callable; import org.apache.helix.ZNRecord; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; -import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; @@ -157,8 +155,9 @@ public class JobDispatcher extends AbstractTaskDispatcher { jobState = workflowCtx.getJobState(jobName); workflowState = workflowCtx.getWorkflowState(); - if (INTERMEDIATE_STATES.contains(jobState) && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) - || TaskState.TIMED_OUT.equals(workflowState))) { + if (INTERMEDIATE_STATES.contains(jobState) + && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout()) + || TaskState.TIMED_OUT.equals(workflowState))) { jobState = TaskState.TIMING_OUT; workflowCtx.setJobState(jobName, TaskState.TIMING_OUT); } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) { @@ -272,7 +271,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name())); } Partition partition = new Partition(pName(jobResource, pId)); - Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance); + Message pendingMessage = + currStateOutput.getPendingMessage(jobResource, partition, instance); // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT, // so that Helix will cancel the transition. if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) { @@ -303,7 +303,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { // can be dropped(note that Helix doesn't track whether the drop is success or not). if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) { handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg); - finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(), jobResource); + finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(), + jobResource); return buildEmptyAssignment(jobResource, currStateOutput); } @@ -340,8 +341,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { TaskPartitionState state = jobContext.getPartitionState(pId); Partition partition = new Partition(pName(jobResource, pId)); String instance = jobContext.getAssignedParticipant(pId); - Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition, - instance); + Message pendingMessage = + currentStateOutput.getPendingMessage(jobResource, partition, instance); // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT && pendingMessage != null)) { @@ -389,7 +390,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { Map<String, Set<Integer>> tasksToDrop) { Map<String, SortedSet<Integer>> result = new HashMap<>(); for (String instance : liveInstances) { - result.put(instance, new TreeSet<Integer>()); + result.put(instance, new TreeSet<>()); } // First, add all task partitions from JobContext @@ -438,7 +439,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { // Check if this is a dropping transition if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) { if (!tasksToDrop.containsKey(instance)) { - tasksToDrop.put(instance, new HashSet<Integer>()); + tasksToDrop.put(instance, new HashSet<>()); } tasksToDrop.get(instance).add(pId); } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 5f4ac14..2f0be85 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -71,7 +71,6 @@ public class TaskDriver { /** For logging */ private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class); - /** Default time out for monitoring workflow or job state */ private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */ @@ -93,20 +92,21 @@ public class TaskDriver { private final HelixAdmin _admin; private final String _clusterName; - public TaskDriver(HelixManager manager) { this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), manager.getClusterName()); } public TaskDriver(HelixZkClient client, String clusterName) { - this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName); + this(client, new ZkBaseDataAccessor<>(client), clusterName); } - public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) { + public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, + String clusterName) { this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor), - new ZkHelixPropertyStore<>(baseAccessor, - PropertyPathBuilder.propertyStore(clusterName), null), clusterName); + new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName), + null), + clusterName); } @Deprecated @@ -123,9 +123,8 @@ public class TaskDriver { _clusterName = clusterName; } - - /** Schedules a new workflow - * + /** + * Schedules a new workflow * @param flow */ public void start(Workflow flow) { @@ -137,7 +136,7 @@ public class TaskDriver { WorkflowConfig newWorkflowConfig = new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build(); - Map<String, String> jobTypes = new HashMap<String, String>(); + Map<String, String> jobTypes = new HashMap<>(); // add all job configs. for (String job : flow.getJobConfigs().keySet()) { JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job)); @@ -196,13 +195,12 @@ public class TaskDriver { .setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow); } if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) { - throw new HelixException(String - .format("Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", - workflow, newWorkflowConfig.getWorkflowId())); + throw new HelixException(String.format( + "Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", workflow, + newWorkflowConfig.getWorkflowId())); } - WorkflowConfig currentConfig = - TaskUtil.getWorkflowConfig(_accessor, workflow); + WorkflowConfig currentConfig = TaskUtil.getWorkflowConfig(_accessor, workflow); if (currentConfig == null) { throw new HelixException("Workflow " + workflow + " does not exist!"); } @@ -223,7 +221,6 @@ public class TaskDriver { /** * Creates a new named job queue (workflow) - * * @param queue */ public void createQueue(JobQueue queue) { @@ -233,7 +230,6 @@ public class TaskDriver { /** * Remove all completed or failed jobs in a job queue * Same as {@link #cleanupQueue(String)} - * * @param queue name of the queue * @throws Exception */ @@ -256,7 +252,8 @@ public class TaskDriver { * the queue has to be stopped prior to this call * @param queue queue name * @param job job name, denamespaced - * @param forceDelete + * @param forceDelete CAUTION: if set true, all job's related zk nodes will + * be clean up from zookeeper even if its workflow information can not be found. */ public void deleteJob(final String queue, final String job, boolean forceDelete) { deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete); @@ -344,20 +341,17 @@ public class TaskDriver { /** * Adds a new job to the end an existing named queue. - * * @param queue * @param job * @param jobBuilder * @throws Exception */ - public void enqueueJob(final String queue, final String job, - JobConfig.Builder jobBuilder) { + public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) { enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder)); } /** * Batch add jobs to queues that garantee - * * @param queue * @param jobs * @param jobBuilders @@ -365,7 +359,6 @@ public class TaskDriver { public void enqueueJobs(final String queue, final List<String> jobs, final List<JobConfig.Builder> jobBuilders) { - // Get the job queue config and capacity WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); if (workflowConfig == null) { @@ -419,78 +412,75 @@ public class TaskDriver { } // update the job dag to append the job to the end of the queue. - DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - // For some reason, the WorkflowConfig for this JobQueue doesn't exist - // In this case, we cannot proceed and must alert the user - throw new HelixException( - String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue)); - } + DataUpdater<ZNRecord> updater = currentData -> { + if (currentData == null) { + // For some reason, the WorkflowConfig for this JobQueue doesn't exist + // In this case, we cannot proceed and must alert the user + throw new HelixException( + String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue)); + } + + // Add the node to the existing DAG + JobDag jobDag = JobDag + .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); + Set<String> allNodes = jobDag.getAllNodes(); + if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) { + throw new IllegalStateException( + String.format("Queue %s already reaches its max capacity %f, failed to add %s", queue, + capacity, jobs.toString())); + } - // Add the node to the existing DAG - JobDag jobDag = JobDag.fromJson( - currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); - Set<String> allNodes = jobDag.getAllNodes(); - if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) { + String lastNodeName = null; + for (int i = 0; i < namespacedJobNames.size(); i++) { + String namespacedJobName = namespacedJobNames.get(i); + if (allNodes.contains(namespacedJobName)) { throw new IllegalStateException(String - .format("Queue %s already reaches its max capacity %f, failed to add %s", queue, - capacity, jobs.toString())); + .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i))); } - - String lastNodeName = null; - for (int i = 0; i < namespacedJobNames.size(); i++) { - String namespacedJobName = namespacedJobNames.get(i); - if (allNodes.contains(namespacedJobName)) { - throw new IllegalStateException(String - .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i))); - } - jobDag.addNode(namespacedJobName); - - // Add the node to the end of the queue - String candidate = null; - if (lastNodeName == null) { - for (String node : allNodes) { - if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) { - candidate = node; - break; - } + jobDag.addNode(namespacedJobName); + + // Add the node to the end of the queue + String candidate = null; + if (lastNodeName == null) { + for (String node : allNodes) { + if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) { + candidate = node; + break; } - } else { - candidate = lastNodeName; - } - if (candidate != null) { - jobDag.addParentToChild(candidate, namespacedJobName); - lastNodeName = namespacedJobName; } + } else { + candidate = lastNodeName; + } + if (candidate != null) { + jobDag.addParentToChild(candidate, namespacedJobName); + lastNodeName = namespacedJobName; } + } - // Add job type if job type is not null - Map<String, String> jobTypes = - currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); - for (String jobType : jobTypeList) { - if (jobType != null) { - if (jobTypes == null) { - jobTypes = new HashMap<>(); - } - jobTypes.put(queue, jobType); + // Add job type if job type is not null + Map<String, String> jobTypes = + currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); + for (String jobType : jobTypeList) { + if (jobType != null) { + if (jobTypes == null) { + jobTypes = new HashMap<>(); } + jobTypes.put(queue, jobType); } + } - if (jobTypes != null) { - currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); - } - // Save the updated DAG - try { - currentData - .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson()); - } catch (Exception e) { - throw new IllegalStateException( - String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e); - } - return currentData; + if (jobTypes != null) { + currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); } + // Save the updated DAG + try { + currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), + jobDag.toJson()); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e); + } + return currentData; }; String path = _accessor.keyBuilder().resourceConfig(queue).getPath(); @@ -540,11 +530,11 @@ public class TaskDriver { throw new IllegalStateException("Queue " + queue + " does not have a valid work state!"); } - Set<String> jobs = new HashSet<String>(); + Set<String> jobs = new HashSet<>(); for (String jobNode : workflowConfig.getJobDag().getAllNodes()) { TaskState curState = wCtx.getJobState(jobNode); - if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED - || curState == TaskState.FAILED)) { + if (curState != null && curState == TaskState.ABORTED || curState == TaskState.COMPLETED + || curState == TaskState.FAILED) { jobs.add(jobNode); } } @@ -558,8 +548,7 @@ public class TaskDriver { _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME); IdealState is = buildWorkflowIdealState(workflow); - TaskUtil - .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); + TaskUtil.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); _admin.setResourceIdealState(_clusterName, workflow, is); } @@ -576,12 +565,12 @@ public class TaskDriver { private IdealState buildWorkflowIdealState(String workflow) { CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow); - IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1) - .setNumPartitions(1).setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView(); + IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1) + .setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView(); IdealState is = IsBuilder.build(); - is.getRecord().setListField(workflow, new ArrayList<String>()); - is.getRecord().setMapField(workflow, new HashMap<String, String>()); + is.getRecord().setListField(workflow, new ArrayList<>()); + is.getRecord().setMapField(workflow, new HashMap<>()); is.setRebalancerClassName(WorkflowRebalancer.class.getName()); return is; @@ -611,23 +600,19 @@ public class TaskDriver { /** * Public async method to stop a workflow/queue. - * * This call only send STOP command to Helix, it does not check * whether the workflow (all jobs) has been stopped yet. - * * @param workflow */ - public void stop(String workflow) throws InterruptedException { + public void stop(String workflow) { setWorkflowTargetState(workflow, TargetState.STOP); } /** * Public sync method to stop a workflow/queue with timeout - * * Basically the workflow and all of its jobs has been stopped if this method return success. - * - * @param workflow The workflow name - * @param timeout The timeout for stopping workflow/queue in milisecond + * @param workflow The workflow name + * @param timeout The timeout for stopping workflow/queue in milisecond */ public void waitToStop(String workflow, long timeout) throws InterruptedException { setWorkflowTargetState(workflow, TargetState.STOP); @@ -636,7 +621,8 @@ public class TaskDriver { while (System.currentTimeMillis() <= endTime) { WorkflowContext workflowContext = getWorkflowContext(workflow); - if (workflowContext == null || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) { + if (workflowContext == null + || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) { Thread.sleep(1000); } else { // Successfully stopped @@ -651,7 +637,6 @@ public class TaskDriver { /** * Public method to delete a workflow/queue. - * * @param workflow */ public void delete(String workflow) { @@ -660,11 +645,10 @@ public class TaskDriver { /** * Public method to delete a workflow/queue. - * * @param workflow * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will - * be clean up immediately from zookeeper, no matter whether there are jobs - * are running or not. + * be clean up immediately from zookeeper, no matter whether there are jobs + * are running or not. */ public void delete(String workflow, boolean forceDelete) { WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); @@ -693,7 +677,8 @@ public class TaskDriver { private void removeWorkflowFromZK(String workflow) { Set<String> jobSet = new HashSet<>(); - // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow + // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove + // workflow WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow); if (wCfg != null) { jobSet.addAll(wCfg.getJobDag().getAllNodes()); @@ -709,11 +694,11 @@ public class TaskDriver { * Public synchronized method to wait for a delete operation to fully complete with timeout. * When this method returns, it means that a queue (workflow) has been completely deleted, meaning * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted. - * * @param workflow workflow/jobqueue name * @param timeout duration to give to delete operation to completion */ - public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException { + public void deleteAndWaitForCompletion(String workflow, long timeout) + throws InterruptedException { delete(workflow); long endTime = System.currentTimeMillis() + timeout; @@ -746,9 +731,11 @@ public class TaskDriver { if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) { failed.append("WorkflowContext "); } - throw new HelixException(String - .format("Failed to delete the workflow/queue %s within %d milliseconds. " - + "The following components still remain: %s", workflow, timeout, failed.toString())); + throw new HelixException( + String.format( + "Failed to delete the workflow/queue %s within %d milliseconds. " + + "The following components still remain: %s", + workflow, timeout, failed.toString())); } /** @@ -780,30 +767,27 @@ public class TaskDriver { } WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow); - if (state != TargetState.DELETE && workflowContext != null && - workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) { + if (state != TargetState.DELETE && workflowContext != null + && workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) { // Should not update target state for completed workflow LOG.info("Workflow " + workflow + " is already completed, skip to update its target state " + state); return; } - DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { - if (currentData != null) { - currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), - state.name()); - } else { - LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is " - + currentData); - } - return currentData; + DataUpdater<ZNRecord> updater = currentData -> { + if (currentData != null) { + currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), + state.name()); + } else { + LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null."); } + return currentData; }; PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow); - _accessor.getBaseDataAccessor() - .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT); + _accessor.getBaseDataAccessor().update(workflowConfigKey.getPath(), updater, + AccessOption.PERSISTENT); RebalanceScheduler.invokeRebalance(_accessor, workflow); } @@ -841,11 +825,10 @@ public class TaskDriver { /** * Batch get the configurations of all workflows in this cluster. - * * @return */ public Map<String, WorkflowConfig> getWorkflows() { - Map<String, WorkflowConfig> workflowConfigMap = new HashMap<String, WorkflowConfig>(); + Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>(); Map<String, ResourceConfig> resourceConfigMap = _accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs()); @@ -865,7 +848,6 @@ public class TaskDriver { * This call will be blocked until either workflow reaches to one of the state specified * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException * Otherwise, it will return current workflow state - * * @param workflowName The workflow to be monitored * @param timeout A long integer presents the time out, in milliseconds * @param targetStates Specified states that user would like to stop monitoring @@ -877,14 +859,15 @@ public class TaskDriver { // Wait for completion. long st = System.currentTimeMillis(); WorkflowContext ctx; - Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates)); + Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(targetStates)); long timeToSleep = timeout > 100L ? 100L : timeout; do { Thread.sleep(timeToSleep); ctx = getWorkflowContext(workflowName); - } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates - .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + timeout); + } while ((ctx == null || ctx.getWorkflowState() == null + || !allowedStates.contains(ctx.getWorkflowState())) + && System.currentTimeMillis() < st + timeout); if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) { throw new HelixException(String.format( @@ -900,7 +883,6 @@ public class TaskDriver { * This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES. * If timeout happens, then it will throw a HelixException, Otherwise, it will return * current job state. - * * @param workflowName The workflow to be monitored * @param targetStates Specified states that user would like to stop monitoring * @return A TaskState, which is current workflow state @@ -915,7 +897,6 @@ public class TaskDriver { * This call will be blocked until either specified job reaches to one of the state * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException * Otherwise, it will return current job state - * * @param workflowName The workflow that contains the job to monitor * @param jobName The specified job to monitor * @param timeout A long integer presents the time out, in milliseconds @@ -952,8 +933,9 @@ public class TaskDriver { do { Thread.sleep(timeToSleep); ctx = getWorkflowContext(workflowName); - } while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates - .contains(ctx.getJobState(jobName))) && System.currentTimeMillis() < st + timeout); + } while ((ctx == null || ctx.getJobState(jobName) == null + || !allowedStates.contains(ctx.getJobState(jobName))) + && System.currentTimeMillis() < st + timeout); if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) { throw new HelixException( @@ -968,7 +950,6 @@ public class TaskDriver { * This is a wrapper function for monitoring job state with default timeout 2 MINUTES. * If timeout happens, then it will throw a HelixException, Otherwise, it will return * current job state - * * @param workflowName The workflow that contains the job to monitor * @param jobName The specified job to monitor * @param states Specified states that user would like to stop monitoring @@ -981,12 +962,12 @@ public class TaskDriver { } /** - * This function returns the timestamp of the very last task that was scheduled. It is provided to help determine + * This function returns the timestamp of the very last task that was scheduled. It is provided to + * help determine * whether a given Workflow/Job/Task is stuck. - * * @param workflowName The name of the workflow * @return timestamp of the most recent job scheduled. - * -1L if timestamp is not set (either nothing is scheduled or no start time recorded). + * -1L if timestamp is not set (either nothing is scheduled or no start time recorded). */ public long getLastScheduledTaskTimestamp(String workflowName) { return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp(); @@ -998,7 +979,6 @@ public class TaskDriver { Integer taskPartitionIndex = null; TaskPartitionState state = null; - WorkflowContext workflowContext = getWorkflowContext(workflowName); if (workflowContext != null) { Map<String, TaskState> allJobStates = workflowContext.getJobStates(); @@ -1035,9 +1015,8 @@ public class TaskDriver { * @param taskName name of task. Optional if scope is WORKFLOW or JOB * @return null if key-value pair not found or this content store does not exist. Otherwise, * return a String - * * @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)}, - * {@link #getJobUserContentMap(String, String)}, + * {@link #getJobUserContentMap(String, String)}, * @{{@link #getTaskContentMap(String, String, String)}} */ @Deprecated diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 51b21eb..83a790f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -69,7 +69,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Split it into status update and assign. But there are couple of data need // to pass around. - public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { + public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, + WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, + BestPossibleStateOutput bestPossibleOutput) { // Fetch workflow configuration and context if (workflowCfg == null) { @@ -93,7 +95,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // If timeout point has already been passed, it will not be scheduled scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout()); - if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { + if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) + && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { workflowCtx.setWorkflowState(TaskState.TIMED_OUT); _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); } @@ -107,7 +110,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Step 3: handle workflow that should STOP // For workflows that already reached final states, STOP should not take into effect - if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP.equals(targetState)) { + if (!finalStates.contains(workflowCtx.getWorkflowState()) + && TargetState.STOP.equals(targetState)) { LOG.info("Workflow " + workflow + "is marked as stopped."); if (isWorkflowStopped(workflowCtx, workflowCfg)) { workflowCtx.setWorkflowState(TaskState.STOPPED); @@ -202,8 +206,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); } - public WorkflowContext getOrInitializeWorkflowContext( - String workflowName, TaskDataCache cache) { + public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) { WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName); if (workflowCtx == null) { workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW)); @@ -380,8 +383,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { jobIS = builder.build(); for (int i = 0; i < numPartitions; i++) { - jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>()); - jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>()); + jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<>()); + jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<>()); } jobIS.setRebalancerClassName(JobRebalancer.class.getName()); admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS); @@ -443,7 +446,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { if (LOG.isDebugEnabled()) { LOG.debug("Ready to start workflow " + newWorkflowName); } - if (lastScheduled == null || !newWorkflowName.equals(lastScheduled)) { + if (!newWorkflowName.equals(lastScheduled)) { Workflow clonedWf = cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule)); TaskDriver driver = new TaskDriver(_manager); @@ -453,7 +456,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { driver.start(clonedWf); } catch (Exception e) { LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); - _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED); + _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), + TaskState.FAILED); } } // Persist workflow start regardless of success to avoid retrying and failing @@ -582,7 +586,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // and jobs will rescheduled again. removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache()); } - } else { + } else { LOG.info("Did not clean up workflow " + workflow + " because neither the workflow is non-terminable nor is set to DELETE."); } @@ -598,5 +602,4 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } cache.removeContext(workflow); } - } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java index 6080399..1e06269 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java @@ -37,7 +37,6 @@ import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; -import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; @@ -59,8 +58,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { private static final String JOB_COMMAND = "DummyCommand"; private Map<String, String> _jobCommandMap; private Map<String, Integer> _quotaTypeExecutionCount = new ConcurrentHashMap<>(); - private Set<String> _availableQuotaTypes = - Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + private Set<String> _availableQuotaTypes = Collections.newSetFromMap(new ConcurrentHashMap<>()); private boolean _finishTask = false; @BeforeClass @@ -87,24 +85,9 @@ public class TestQuotaBasedScheduling extends TaskTestBase { // Set task callbacks Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); - TaskFactory shortTaskFactory = new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new ShortTask(context, instanceName); - } - }; - TaskFactory longTaskFactory = new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new LongTask(context, instanceName); - } - }; - TaskFactory failTaskFactory = new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new FailTask(context, instanceName); - } - }; + TaskFactory shortTaskFactory = context -> new ShortTask(context, instanceName); + TaskFactory longTaskFactory = context -> new LongTask(context, instanceName); + TaskFactory failTaskFactory = context -> new FailTask(context, instanceName); taskFactoryReg.put("ShortTask", shortTaskFactory); taskFactoryReg.put("LongTask", longTaskFactory); taskFactoryReg.put("FailTask", failTaskFactory); @@ -155,7 +138,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { for (int i = 0; i < 10; i++) { List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap); workflowBuilder.addJob("JOB" + i, jobConfigBulider); @@ -193,7 +176,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { for (int i = 0; i < 10; i++) { List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs) .setJobCommandConfigMap(_jobCommandMap).setJobType("UNDEFINED"); @@ -231,7 +214,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { for (int i = 0; i < 5; i++) { List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A"); workflowBuilder.addJob("JOB" + i, jobConfigBulider); @@ -239,7 +222,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { for (int i = 5; i < 10; i++) { List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("B"); workflowBuilder.addJob("JOB" + i, jobConfigBulider); @@ -283,8 +266,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase { } // Test that the next two are not executing - JobContext context_2 = _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0"); - JobContext context_3 = _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0"); + JobContext context_2 = + _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0"); + JobContext context_3 = + _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0"); Assert.assertNull(context_2.getPartitionState(0)); Assert.assertNull(context_3.getPartitionState(0)); @@ -493,7 +478,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { for (int i = 0; i < numWorkflows; i++) { String workflowName = workflowNames.get(i); TaskState state = (i % 3 == 1) ? TaskState.FAILED : TaskState.COMPLETED; - Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(), + Assert.assertEquals(TaskDriver.getWorkflowContext(_manager, workflowName).getWorkflowState(), state); } @@ -536,7 +521,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { // First run some jobs with quotaType A List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A"); @@ -553,7 +538,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { // Run some jobs with quotaType B // First run some jobs with quotaType A taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>())); + taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>())); jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs) .setJobCommandConfigMap(_jobCommandMap).setJobType("B"); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java index e12b4a9..6de1d3f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java @@ -63,8 +63,7 @@ public class TestTaskRebalancer extends TaskTestBase { JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); jobBuilder.setJobCommandConfigMap(commandConfig); - Workflow flow = WorkflowGenerator - .generateSingleJobWorkflowBuilder(jobName, jobBuilder) + Workflow flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName, jobBuilder) .setExpiry(expiry).build(); _driver.start(flow); @@ -77,8 +76,8 @@ public class TestTaskRebalancer extends TaskTestBase { Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName); // Ensure context and config exist - Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, - AccessOption.PERSISTENT)); + Assert.assertTrue( + _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT)); Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null); // Wait for job to finish and expire @@ -86,8 +85,8 @@ public class TestTaskRebalancer extends TaskTestBase { Thread.sleep(expiry + 100); // Ensure workflow config and context were cleaned up by now - Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, - AccessOption.PERSISTENT)); + Assert.assertFalse( + _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT)); Assert.assertNull(accessor.getProperty(workflowCfgKey)); } @@ -120,7 +119,8 @@ public class TestTaskRebalancer extends TaskTestBase { } } - @Test public void partitionSet() throws Exception { + @Test + public void partitionSet() throws Exception { final String jobResource = "partitionSet"; ImmutableList<String> targetPartitions = ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13"); @@ -170,7 +170,8 @@ public class TestTaskRebalancer extends TaskTestBase { } } - @Test public void timeouts() throws Exception { + @Test + public void timeouts() throws Exception { final String jobResource = "timeouts"; JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG); @@ -216,12 +217,10 @@ public class TestTaskRebalancer extends TaskTestBase { // Enqueue jobs Set<String> master = Sets.newHashSet("MASTER"); Set<String> slave = Sets.newHashSet("SLAVE"); - JobConfig.Builder job1 = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); - JobConfig.Builder job2 = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); + JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); + JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); _driver.enqueueJob(queueName, "masterJob", job1); _driver.enqueueJob(queueName, "slaveJob", job2);
