Repository: incubator-gobblin Updated Branches: refs/heads/master 4b5f55d08 -> 81898c07a
[GOBBLIN-518] Add option to cancel a running job in service Closes #2425 from arjun4084346/cancelHelixJob Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/81898c07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/81898c07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/81898c07 Branch: refs/heads/master Commit: 81898c07a79440738ccf8892c3079bc0cfa89f0f Parents: 4b5f55d Author: Arjun <[email protected]> Authored: Tue Aug 14 17:17:13 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Aug 14 17:17:13 2018 -0700 ---------------------------------------------------------------------- ...blinHelixDistributeJobExecutionLauncher.java | 91 ++++++++++++++----- .../cluster/GobblinHelixJobLauncher.java | 39 ++++---- .../gobblin/cluster/GobblinHelixJobTask.java | 7 +- .../org/apache/gobblin/cluster/HelixUtils.java | 94 +++++++++++++++++--- .../cluster/GobblinHelixJobLauncherTest.java | 5 +- .../runtime/GobblinMultiTaskAttempt.java | 2 +- 6 files changed, 184 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index f887f0b..b5f8928 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -103,6 +103,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher private boolean jobSubmitted; + // A conditional variable for which the condition is satisfied if a cancellation is requested + private final Object cancellationRequest = new Object(); + // A flag indicating whether a cancellation has been requested or not + private volatile boolean cancellationRequested = false; + // A flag indicating whether a cancellation has been executed or not + private volatile boolean cancellationExecuted = false; + @Getter + private DistributeJobMonitor jobMonitor; + public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception { this.helixManager = builder.manager; this.helixTaskDriver = new TaskDriver(this.helixManager); @@ -128,13 +137,23 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } @Override - public void close() - throws IOException { - // we should delete the planning job at the end. + public void close() throws IOException { + } + + private void executeCancellation() { + String planningName = getPlanningJobId(this.jobProperties); if (this.jobSubmitted) { - String planningName = getPlanningJobName(this.jobProperties); - log.info("[DELETE] workflow {} in the close.", planningName); - this.helixTaskDriver.delete(planningName); + try { + if (this.cancellationRequested && !this.cancellationExecuted) { + // TODO : fix this when HELIX-1180 is completed + // work flow should never be deleted explicitly because it has a expiry time + // If cancellation is requested, we should set the job state to CANCELLED/ABORT + this.helixTaskDriver.waitToStop(planningName, 10000L); + log.info("Stopped the workflow ", planningName); + } + } catch (InterruptedException e) { + throw new RuntimeException("Failed to stop workflow " + planningName + " in Helix", e); + } } } @@ -210,7 +229,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher @Override public DistributeJobMonitor launchJob(JobSpec jobSpec) { - return new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties)); + this.jobMonitor = new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties)); + return this.jobMonitor; } @AllArgsConstructor @@ -219,20 +239,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher @Override public DistributeJobResult call() throws Exception { - String planningName = getPlanningJobName(this.jobProps); String planningId = getPlanningJobId(this.jobProps); JobConfig.Builder builder = createPlanningJob(this.jobProps); try { - submitJobToHelix(planningName, planningId, builder); - return waitForJobCompletion(planningName, planningId); + submitJobToHelix(planningId, planningId, builder); + return waitForJobCompletion(planningId, planningId); } catch (Exception e) { - log.error(planningName + " is not able to submit."); + log.error(planningId + " is not able to submit."); return new DistributeJobResult(Optional.empty(), Optional.of(e)); } } } - private DistributeJobResult waitForJobCompletion(String planningName, String planningId) throws InterruptedException { + private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException { boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, @@ -241,16 +260,13 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher try { HelixUtils.waitJobCompletion( GobblinHelixDistributeJobExecutionLauncher.this.helixManager, - planningName, - planningId, + workFlowName, + jobName, timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty()); return getResultFromUserContent(); } catch (TimeoutException te) { - helixTaskDriver.waitToStop(planningName, 10L); - log.info("[DELETE] workflow {} timeout.", planningName); - this.helixTaskDriver.delete(planningName); - this.helixTaskDriver.resume(planningName); - log.info("stopped the queue, deleted the job"); + HelixUtils.handleJobTimeout(workFlowName, jobName, + helixManager, this, null); return new DistributeJobResult(Optional.empty(), Optional.of(te)); } } @@ -282,9 +298,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } } - static class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor { + private class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor { private ExecutorService executor = Executors.newSingleThreadExecutor(); - public DistributeJobMonitor (Callable<ExecutionResult> c) { + DistributeJobMonitor (Callable<ExecutionResult> c) { super(c); this.executor.execute(this); } @@ -293,6 +309,39 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher public MonitoredObject getRunningState() { throw new UnsupportedOperationException(); } + + /** + * We override Future's cancel method, which means we do not send the interrupt to the underlying thread. + * Instead of that, we submit a STOP request to handle, and the underlying thread is supposed to gracefully accept + * the STOPPED state and exit in {@link #waitForJobCompletion} method. + * @param mayInterruptIfRunning this is ignored. + * @return true always + */ + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + GobblinHelixDistributeJobExecutionLauncher.this.executeCancellation(); + return true; + } + } + + /** + * This method calls the underlying {@link DistributeJobMonitor}'s cancel method. + * It uses a conditional variable {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequest} + * and a flag {@link GobblinHelixDistributeJobExecutionLauncher#cancellationRequested} to avoid double cancellation. + */ + public void cancel() { + DistributeJobMonitor jobMonitor = getJobMonitor(); + if (jobMonitor != null) { + synchronized (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequest) { + if (GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested) { + // Return immediately if a cancellation has already been requested + return; + } + GobblinHelixDistributeJobExecutionLauncher.this.cancellationRequested = true; + } + jobMonitor.cancel(true); + GobblinHelixDistributeJobExecutionLauncher.this.cancellationExecuted = true; + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index e2447a5..6672923 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -45,6 +45,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nullable; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; @@ -108,6 +109,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final TaskDriver helixTaskDriver; private final String helixWorkFlowName; private final String jobResourceName; + @Getter private JobListener jobListener; private final FileSystem fs; @@ -170,6 +172,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir); + startCancellationExecutor(); } @@ -193,10 +196,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { TimingEvent jobSubmissionTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION); - submitJobToHelix(createJob(workUnits)); - jobSubmissionTimer.stop(); - LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId())); - this.jobSubmitted = true; + + synchronized (this.cancellationRequest) { + if (!this.cancellationRequested) { + submitJobToHelix(createJob(workUnits)); + jobSubmissionTimer.stop(); + LOGGER.info(String.format("Submitted job %s to Helix", this.jobContext.getJobId())); + this.jobSubmitted = true; + } else { + LOGGER.warn("Job {} not submitted to Helix as it was requested to be cancelled.", this.jobContext.getJobId()); + } + } TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN); waitForJobCompletion(); @@ -214,15 +224,15 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { protected void executeCancellation() { if (this.jobSubmitted) { try { - log.info("[DELETE] workflow {}", this.helixWorkFlowName); - if (this.cancellationRequested) { + if (this.cancellationRequested && !this.cancellationExecuted) { // TODO : fix this when HELIX-1180 is completed // work flow should never be deleted explicitly because it has a expiry time // If cancellation is requested, we should set the job state to CANCELLED/ABORT - this.helixTaskDriver.delete(this.helixWorkFlowName); + this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 10000L); + log.info("stopped the workflow ", this.helixWorkFlowName); } - } catch (IllegalArgumentException e) { - LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to stop workflow " + helixWorkFlowName + " in Helix", e); } } } @@ -367,7 +377,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } private void waitForJobCompletion() throws InterruptedException { - LOGGER.info("Waiting for job to complete..."); boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, @@ -380,14 +389,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.jobContext.getJobId(), timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); } catch (TimeoutException te) { - helixTaskDriver.waitToStop(helixWorkFlowName, 10L); - try { - cancelJob(this.jobListener); - } catch (JobException e) { - throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e); - } - this.helixTaskDriver.resume(this.helixWorkFlowName); - LOGGER.info("stopped the queue, deleted the job"); + HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(), + helixManager, this, this.jobListener); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index 9ede090..4fb198c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.extractor.partition.Partitioner; @@ -133,7 +134,11 @@ public class GobblinHelixJobTask implements Task { public void cancel() { log.info("Cancelling planning job {}", this.planningJobId); if (launcher != null) { - launcher.executeCancellation(); + try { + launcher.cancelJob(launcher.getJobListener()); + } catch (JobException e) { + throw new RuntimeException("Unable to cancel planning job " + this.planningJobId + ": ", e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 1a11ee3..452e421 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -25,8 +25,8 @@ import org.apache.helix.HelixManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.task.JobConfig; -import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; import org.apache.helix.task.WorkflowConfig; @@ -36,6 +36,11 @@ import org.apache.helix.tools.ClusterSetup; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.JobException; +import org.apache.gobblin.runtime.listeners.JobListener; + +import static org.apache.helix.task.TaskState.STOPPED; + /** * A utility class for working with Gobblin on Helix. @@ -115,15 +120,26 @@ public class HelixUtils { // start the workflow helixTaskDriver.start(workFlow); log.info("Created a work flow {}", workFlowName); + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); + + // If the helix job is deleted from some other thread or a completely external process, + // method waitJobCompletion() needs to differentiate between the cases where + // 1) workflowContext did not get initialized ever, in which case we need to keep waiting, or + // 2) it did get initialized but deleted soon after, in which case we should stop waiting + // To overcome this issue, we wait here till workflowContext gets initialized + + while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) { + workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); + Thread.sleep(1000); + log.info("Waiting for work flow initialization."); + } + log.info("Work flow {} initialized", workFlowName); } - public static void waitJobCompletion( - HelixManager helixManager, - String workFlowName, - String jobName, + static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName, Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException { - log.info("Waiting for job to complete..."); + log.info("Waiting for job {} to complete...", jobName); long endTime = 0; if (timeoutInSeconds.isPresent()) { endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000; @@ -132,16 +148,70 @@ public class HelixUtils { while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); if (workflowContext != null) { - org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)); - if (helixJobState == org.apache.helix.task.TaskState.COMPLETED || - helixJobState == org.apache.helix.task.TaskState.FAILED || - helixJobState == org.apache.helix.task.TaskState.STOPPED) { + TaskState jobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)); + switch (jobState) { + case STOPPED: + // user requested cancellation, which is executed by executeCancellation() + log.info("Job {} is cancelled, it will be deleted now.", jobName); + HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName); + return; + case FAILED: + case COMPLETED: return; + default: + log.info("Waiting for job {} to complete...", jobName); + Thread.sleep(1000); } + } else { + // We have waited for WorkflowContext to get initialized, + // so it is found null here, it must have been deleted in job cancellation process. + log.info("WorkflowContext not found. Job is probably cancelled."); + return; } - Thread.sleep(1000); } throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired."); } -} + + static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher, + JobListener jobListener) throws InterruptedException { + try { + if (jobLauncher instanceof GobblinHelixJobLauncher) { + ((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener); + } else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) { + ((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel(); + } else { + log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass()); + } + } catch (JobException e) { + throw new RuntimeException("Unable to cancel job " + jobName + ": ", e); + } + // TODO : fix this when HELIX-1180 is completed + // We should not be deleting a workflow explicitly. + // Workflow state should be set to a final state, which will remove it automatically because expiry time is set. + // After that, all delete calls can be replaced by something like HelixUtils.setStateToFinal(); + HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName); + log.info("Stopped and deleted the workflow {}", workFlowName); + } + + /** + * Deletes the stopped Helix Workflow. + * Caller should stop the Workflow before calling this method. + * @param helixManager helix manager + * @param workFlowName workflow needed to be deleted + * @param jobName helix job name + * @throws InterruptedException + */ + private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName) + throws InterruptedException { + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); + while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { + log.info("Waiting for job {} to stop...", jobName); + workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); + Thread.sleep(1000); + } + // deleting the entire workflow, as one workflow contains only one job + new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L); + log.info("Workflow deleted."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index c66a9e8..1e1db0e 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -186,7 +186,9 @@ public class GobblinHelixJobLauncherTest { properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName); - properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "2"); + // expiry time should be more than the time needed for the job to complete + // otherwise JobContext will become null. This is how Helix work flow works. + properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "5"); return properties; } @@ -299,6 +301,7 @@ public class GobblinHelixJobLauncherTest { final String jobIdKey1 = properties.getProperty(ConfigurationKeys.JOB_ID_KEY); final String jobIdKey2 = properties2.getProperty(ConfigurationKeys.JOB_ID_KEY); + org.apache.helix.task.JobContext jobContext1 = taskDriver.getJobContext(jobIdKey1); org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(jobIdKey2); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/81898c07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java index e06f338..5784e61 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java @@ -115,6 +115,7 @@ public class GobblinMultiTaskAttempt { this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" + containerIdOptional.or("noattempt")); this.jobBroker = jobBroker; + this.tasks = new ArrayList<>(); } /** @@ -126,7 +127,6 @@ public class GobblinMultiTaskAttempt { throws IOException, InterruptedException { if (!this.workUnits.hasNext()) { log.warn("No work units to run in container " + containerIdOptional.or("")); - this.tasks = new ArrayList<>(); return; }
