This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 678a3af  [GOBBLIN-1542] Integrate with Helix API to add/remove task 
from a running helix job (#3393)
678a3af is described below

commit 678a3afa19b6f0ba88d370171993a04a518503d4
Author: Zihan Li <[email protected]>
AuthorDate: Wed Oct 13 14:09:36 2021 -0700

    [GOBBLIN-1542] Integrate with Helix API to add/remove task from a running 
helix job (#3393)
    
    * [GOBBLIN-1542] Integrate with Helix API to add/remove task from a running 
helix job
    
    * fix error when deleting task
    
    * fix some flaky test
    
    * remove unintentional change
    
    * fix test class path for gobblin-service
---
 .../org/apache/gobblin/source/InfiniteSource.java  |  40 ++++
 .../apache/gobblin/stream/WorkUnitChangeEvent.java |  37 +++
 gobblin-cluster/build.gradle                       |   4 +-
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 257 ++++++++++++++-------
 .../org/apache/gobblin/cluster/HelixUtils.java     |  28 +++
 .../gobblin/cluster/ClusterIntegrationTest.java    |   6 +-
 .../gobblin/cluster/GobblinTaskRunnerTest.java     |   3 +-
 .../cluster/HelixAssignedParticipantCheckTest.java |   8 +-
 .../org/apache/gobblin/metastore/ZkStateStore.java |   2 +-
 .../extract/kafka/KafkaStreamingExtractor.java     |   4 +-
 .../extract/kafka/UniversalKafkaSource.java        |  22 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |  36 ++-
 .../apache/gobblin/runtime/SourceDecorator.java    |  11 +
 gobblin-service/build.gradle                       |   4 +-
 gobblin-yarn/build.gradle                          |   4 +-
 .../yarn/YarnServiceTestWithExpiration.java        |  20 +-
 gradle/scripts/dependencyDefinitions.gradle        |   2 +-
 17 files changed, 376 insertions(+), 112 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java
new file mode 100644
index 0000000..51eaf68
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An interface for infinite source, where source should be able to detect the 
work unit change
+ * and post the change through eventBus
+ *
+ * @author Zihan Li
+ *
+ * @param <S> output schema type
+ * @param <D> output record type
+ */
+@Alpha
+public interface InfiniteSource<S, D> extends Source<S, D>{
+
+  /**
+   * Return the eventBus where it will post {@link 
org.apache.gobblin.stream.WorkUnitChangeEvent} when workUnit change
+   */
+  EventBus getEventBus();
+
+}
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java 
b/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java
new file mode 100644
index 0000000..f977a4c
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.stream;
+
+import java.util.List;
+import lombok.Getter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * The event for {@link org.apache.gobblin.source.InfiniteSource} to indicate 
there is a change in work units
+ * Job launcher should then be able to handle this event
+ */
+public class WorkUnitChangeEvent {
+  @Getter
+  private final List<String> oldTaskIds;
+  @Getter
+  private final List<WorkUnit> newWorkUnits;
+  public WorkUnitChangeEvent(List<String> oldTaskIds, List<WorkUnit> 
newWorkUnits) {
+    this.oldTaskIds = oldTaskIds;
+    this.newWorkUnits = newWorkUnits;
+  }
+}
diff --git a/gobblin-cluster/build.gradle b/gobblin-cluster/build.gradle
index c49be50..5b574eb 100644
--- a/gobblin-cluster/build.gradle
+++ b/gobblin-cluster/build.gradle
@@ -47,7 +47,9 @@ dependencies {
   compile externalDependency.hadoopCommon
   compile externalDependency.avroMapredH2
   compile externalDependency.findBugsAnnotations
-  compile externalDependency.helix
+  compile (externalDependency.helix) {
+    exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+  }
 
   runtimeOnly project(":gobblin-modules:gobblin-service-kafka")
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index f50dfb4..4d48fcd 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,14 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -127,13 +133,12 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final Config jobConfig;
   private final long workFlowExpiryTimeSeconds;
   private final long helixJobStopTimeoutSeconds;
+  private Map<String, TaskConfig> workUnitToHelixConfig;
+  private Retryer<Boolean> taskRetryer;
 
-  public GobblinHelixJobLauncher (Properties jobProps,
-                                  final HelixManager helixManager,
-                                  Path appWorkDir,
-                                  List<? extends Tag<?>> metadataTags,
-                                  ConcurrentHashMap<String, Boolean> 
runningMap,
-                                  Optional<GobblinHelixMetrics> helixMetrics) 
throws Exception {
+  public GobblinHelixJobLauncher(Properties jobProps, final HelixManager 
helixManager, Path appWorkDir,
+      List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap,
+      Optional<GobblinHelixMetrics> helixMetrics) throws Exception {
 
     super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
     LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
@@ -142,8 +147,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     this.runningMap = runningMap;
     this.appWorkDir = appWorkDir;
     this.inputWorkUnitDir = new Path(appWorkDir, 
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
-    this.outputTaskStateDir = new Path(this.appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
-        + Path.SEPARATOR + this.jobContext.getJobId());
+    this.outputTaskStateDir = new Path(this.appWorkDir,
+        GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + 
Path.SEPARATOR + this.jobContext.getJobId());
 
     this.helixWorkFlowName = this.jobContext.getJobId();
     this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
@@ -152,36 +157,36 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
-    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
-        GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
-        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
+    this.workFlowExpiryTimeSeconds =
+        ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
 
-    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(jobConfig,
-        GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
-        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+    this.helixJobStopTimeoutSeconds =
+        ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
 
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-            new URI(appWorkDir.toUri().getScheme(), null, 
appWorkDir.toUri().getHost(),
-                appWorkDir.toUri().getPort(), "/", null, null).toString()));
+            new URI(appWorkDir.toUri().getScheme(), null, 
appWorkDir.toUri().getHost(), appWorkDir.toUri().getPort(),
+                "/", null, null).toString()));
 
-    this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir,
-        GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir,
-        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
-        GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
+    this.stateStores =
+        new StateStores(stateStoreJobConfig, appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
+            appWorkDir, 
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
+            GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
 
     URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
     this.fs = FileSystem.get(fsUri, new Configuration());
 
-    this.taskStateCollectorService = new TaskStateCollectorService(jobProps,
-        this.jobContext.getJobState(),
-        this.eventBus,
-        this.eventSubmitter,
-        this.stateStores.getTaskStateStore(),
-        this.outputTaskStateDir,
-        this.getIssueRepository());
+    this.taskStateCollectorService =
+        new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), 
this.eventBus, this.eventSubmitter,
+            this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
 
     this.helixMetrics = helixMetrics;
+    this.workUnitToHelixConfig = new HashMap<>();
+    this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
+        .retryIfException()
+        .withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
     startCancellationExecutor();
   }
 
@@ -269,6 +274,64 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
+  protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) 
throws IOException, ExecutionException,
+                                                                               
     RetryException {
+    String jobName = this.jobContext.getJobId();
+    try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
+      for (String workUnitId : workUnitIdsToRemove) {
+        taskRetryer.call(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            String taskId = workUnitToHelixConfig.get(workUnitId).getId();
+            boolean remove =
+                HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, 
taskId, helixTaskDriver);
+            if (remove) {
+              log.info(String.format("Removed helix task %s with gobblin task 
id  %s from helix job %s:%s ", taskId,
+                  workUnitId, helixWorkFlowName, jobName));
+            } else {
+              throw new IOException(
+                  String.format("Cannot remove task %s from helix job %s:%s", 
workUnitId,
+                      helixWorkFlowName, jobName));
+            }
+            return true;
+          }
+        });
+        deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
+        log.info(String.format("remove task state for %s in state store", 
workUnitId));
+        this.workUnitToHelixConfig.remove(workUnitId);
+      }
+    }
+  }
+
+  protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) throws 
IOException, ExecutionException,
+                                                                            
RetryException {
+    String jobName = this.jobContext.getJobId();
+    try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
+      for (WorkUnit workunit : workUnitsToAdd) {
+        TaskConfig taskConfig = getTaskConfig(workunit, stateSerDeRunner);
+        this.taskRetryer.call(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            boolean added =
+                HelixUtils.addTaskToHelixJob(helixWorkFlowName, jobName, 
taskConfig, helixTaskDriver);
+            if (added) {
+              log.info(
+                  String.format("Added task %s to helix job %s:%s ", 
workunit.getId(), helixWorkFlowName, jobName));
+            } else {
+              log.error(
+                  String.format("Failed to add task %s to helix job %s:%s ", 
workunit.getId(), helixWorkFlowName,
+                      jobName));
+              throw new IOException(
+                  String.format("Cannot add task %s to helix job %s:%s", 
workunit.getId(),
+                      helixWorkFlowName, jobName));
+            }
+            return true;
+          }
+        });
+      }
+    }
+  }
+
   /**
    * Create a job from a given batch of {@link WorkUnit}s.
    */
@@ -289,8 +352,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       // write the job.state using the state store if present, otherwise 
serialize directly to the file
       if (this.stateStores.haveJobStateStore()) {
         jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(true, 
this.appWorkDir, this.jobContext.getJobId());
-        
this.stateStores.getJobStateStore().put(jobStateFilePath.getParent().getName(), 
jobStateFilePath.getName(),
-            this.jobContext.getJobState());
+        this.stateStores.getJobStateStore()
+            .put(jobStateFilePath.getParent().getName(), 
jobStateFilePath.getName(), this.jobContext.getJobState());
       } else {
         jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, 
this.appWorkDir, this.jobContext.getJobId());
         SerializationUtils.serializeState(this.fs, jobStateFilePath, 
this.jobContext.getJobState());
@@ -315,23 +378,22 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
 
     // Helix task attempts = retries + 1 (fallback to general task retry for 
backward compatibility)
-    jobConfigBuilder.setMaxAttemptsPerTask(gobblinJobState.getPropAsInt(
-        GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY, 
gobblinJobState.getPropAsInt(
-            ConfigurationKeys.MAX_TASK_RETRIES_KEY,
-            ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
+    jobConfigBuilder.setMaxAttemptsPerTask(
+        
gobblinJobState.getPropAsInt(GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
+            
gobblinJobState.getPropAsInt(ConfigurationKeys.MAX_TASK_RETRIES_KEY,
+                ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
 
     // Helix task timeout (fallback to general task timeout for backward 
compatibility)
-    jobConfigBuilder.setTimeoutPerTask(gobblinJobState.getPropAsLong(
-        GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
-        gobblinJobState.getPropAsLong(
-            ConfigurationKeys.TASK_TIMEOUT_SECONDS,
-            ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
+    jobConfigBuilder.setTimeoutPerTask(
+        
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
+            
gobblinJobState.getPropAsLong(ConfigurationKeys.TASK_TIMEOUT_SECONDS,
+                ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
 
     jobConfigBuilder.setFailureThreshold(workUnits.size());
     
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME);
-    
jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig,
-        GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
-        
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
+    jobConfigBuilder.setNumConcurrentTasksPerInstance(
+        ConfigUtils.getInt(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
+            
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
 
     if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) {
       String jobTag = 
this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY);
@@ -349,8 +411,9 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }
 
-    jobConfigBuilder.setExpiry(gobblinJobState.getPropAsLong(
-        GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, 
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
+    jobConfigBuilder.setExpiry(
+        
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
 
     return jobConfigBuilder;
   }
@@ -359,16 +422,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws 
Exception {
-    HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
-        this.helixWorkFlowName,
-        this.jobContext.getJobId(),
-        this.helixTaskDriver,
-        this.helixManager,
-        this.workFlowExpiryTimeSeconds);
+    HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, 
this.jobContext.getJobId(),
+        this.helixTaskDriver, this.helixManager, 
this.workFlowExpiryTimeSeconds);
   }
 
-  public void launchJob(@Nullable JobListener jobListener)
-      throws JobException {
+  public void launchJob(@Nullable JobListener jobListener) throws JobException 
{
     this.jobListener = jobListener;
     boolean isLaunched = false;
     this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
@@ -376,11 +434,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     Throwable errorInJobLaunching = null;
     try {
       if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
-        LOGGER.info ("Job {} will be executed, add into running map.", 
this.jobContext.getJobId());
+        LOGGER.info("Job {} will be executed, add into running map.", 
this.jobContext.getJobId());
         isLaunched = true;
         super.launchJob(jobListener);
       } else {
-        LOGGER.warn ("Job {} will not be executed because other jobs are still 
running.", this.jobContext.getJobId());
+        LOGGER.warn("Job {} will not be executed because other jobs are still 
running.", this.jobContext.getJobId());
       }
       // TODO: Better error handling
     } catch (Throwable t) {
@@ -388,23 +446,19 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     } finally {
       if (isLaunched) {
         if (this.runningMap.replace(this.jobContext.getJobName(), true, 
false)) {
-          LOGGER.info ("Job {} is done, remove from running map.", 
this.jobContext.getJobId());
+          LOGGER.info("Job {} is done, remove from running map.", 
this.jobContext.getJobId());
         } else {
-          throw errorInJobLaunching == null ? new IllegalStateException("A 
launched job should have running state equal to true in the running map.")
+          throw errorInJobLaunching == null ? new IllegalStateException(
+              "A launched job should have running state equal to true in the 
running map.")
               : new RuntimeException("Failure in launching job:", 
errorInJobLaunching);
         }
       }
     }
   }
 
-  /**
-   * Add a single {@link WorkUnit} (flattened) to persistent storage so that 
worker can fetch that based on information
-   * fetched in Helix task.
-   */
-  private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner,
-      Map<String, TaskConfig> taskConfigMap) throws IOException {
-    String workUnitFilePath = persistWorkUnit(
-        new Path(this.inputWorkUnitDir, this.jobContext.getJobId()), workUnit, 
stateSerDeRunner);
+  private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner 
stateSerDeRunner) throws IOException {
+    String workUnitFilePath =
+        persistWorkUnit(new Path(this.inputWorkUnitDir, 
this.jobContext.getJobId()), workUnit, stateSerDeRunner);
 
     Map<String, String> rawConfigMap = Maps.newHashMap();
     rawConfigMap.put(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH, 
workUnitFilePath);
@@ -412,8 +466,42 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     rawConfigMap.put(ConfigurationKeys.JOB_ID_KEY, this.jobContext.getJobId());
     rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
     
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, 
"true");
+    TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
+    workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
+    return taskConfig;
+  }
+
+  /**
+   * Add a single {@link WorkUnit} (flattened) to persistent storage so that 
worker can fetch that based on information
+   * fetched in Helix task.
+   */
+  private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, 
Map<String, TaskConfig> taskConfigMap)
+      throws IOException {
+    taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit, 
stateSerDeRunner));
+  }
 
-    taskConfigMap.put(workUnit.getId(), TaskConfig.Builder.from(rawConfigMap));
+  /**
+   * Delete a single {@link WorkUnit} (flattened) from state store.
+   */
+  private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner 
stateSerDeRunner) {
+    String workUnitFilePath =
+        
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+    final StateStore stateStore;
+    Path workUnitFile = new Path(workUnitFilePath);
+    final String fileName = workUnitFile.getName();
+    final String storeName = workUnitFile.getParent().getName();
+    if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
+      stateStore = stateStores.getMwuStateStore();
+    } else {
+      stateStore = stateStores.getWuStateStore();
+    }
+    stateSerDeRunner.submitCallable(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        stateStore.delete(storeName, fileName);
+        return null;
+      }
+    }, "Delete state " + fileName + " from store " + storeName);
   }
 
   /**
@@ -446,28 +534,23 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     return workUnitFile.toString();
   }
 
-  private void waitForJobCompletion()  throws InterruptedException {
-    boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
-        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
-    long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(
-        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
-        GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+  private void waitForJobCompletion() throws InterruptedException {
+    boolean timeoutEnabled = Boolean.parseBoolean(
+        
this.jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+    long timeoutInSeconds = Long.parseLong(
+        
this.jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
-    long stoppingStateTimeoutInSeconds = PropertiesUtils
-        .getPropAsLong(this.jobProps, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
-            
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+    long stoppingStateTimeoutInSeconds = 
PropertiesUtils.getPropAsLong(this.jobProps,
+        
GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
 
     try {
-      HelixUtils.waitJobCompletion(
-          this.helixManager,
-          this.helixWorkFlowName,
-          this.jobContext.getJobId(),
-          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty(),
-          stoppingStateTimeoutInSeconds);
+      HelixUtils.waitJobCompletion(this.helixManager, this.helixWorkFlowName, 
this.jobContext.getJobId(),
+          timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty(), 
stoppingStateTimeoutInSeconds);
     } catch (TimeoutException te) {
-      HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
-          helixManager, this, this.jobListener);
+      HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(), 
helixManager, this, this.jobListener);
     }
   }
 
@@ -486,7 +569,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     if (this.stateStores.haveJobStateStore()) {
       this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
     } else {
-      Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, 
this.appWorkDir, this.jobContext.getJobId());
+      Path jobStateFilePath =
+          GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, 
this.jobContext.getJobId());
       this.fs.delete(jobStateFilePath, false);
     }
   }
@@ -497,7 +581,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    * @param inputTags list of metadata tags
    * @return
    */
-  private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties 
jobProps, List<? extends Tag<?>> inputTags) {
+  private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties 
jobProps,
+      List<? extends Tag<?>> inputTags) {
     List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
     String jobId;
 
@@ -515,8 +600,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
           jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
-      metadataTags.add(
-          new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
 
       // use job execution id if flow execution id is not present
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index fe23f20..80962b4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -36,6 +36,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
@@ -145,6 +146,33 @@ public class HelixUtils {
     log.info("Work flow {} initialized", workFlowName);
   }
 
+  protected static boolean deleteTaskFromHelixJob(String workFlowName,
+      String jobName, String taskID, TaskDriver helixTaskDriver) {
+    try {
+      log.info(String.format("try to delete task %s from workflow %s, job %s", 
taskID, workFlowName, jobName));
+      helixTaskDriver.deleteTask(workFlowName, jobName, taskID);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return 
!helixTaskDriver.getJobConfig(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)).getMapConfigs().containsKey(taskID);
+    }
+    return true;
+  }
+
+  protected static boolean addTaskToHelixJob(String workFlowName,
+      String jobName, TaskConfig taskConfig, TaskDriver helixTaskDriver) {
+    String taskId = taskConfig.getId();
+    try {
+      log.info(String.format("try to add task %s to workflow %s, job %s", 
taskId, workFlowName, jobName));
+      helixTaskDriver.addTask(workFlowName, jobName, taskConfig);
+    } catch (Exception e) {
+      e.printStackTrace();
+      JobContext jobContext =
+          
helixTaskDriver.getJobContext(TaskUtil.getNamespacedJobName(workFlowName, 
jobName));
+      return jobContext.getTaskIdPartitionMap().containsKey(taskId);
+    }
+    return true;
+  }
+
   public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
       String workFlowName,
       String jobName,
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index fb5b8e5..a24e627 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -29,12 +29,12 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ChainedPathZkSerializer;
-import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -89,8 +89,8 @@ public class ClusterIntegrationTest {
         IntegrationJobCancelSuite.TASK_STATE_FILE)
         .withValue(SleepingTask.SLEEP_TIME_IN_SECONDS, 
ConfigValueFactory.fromAnyRef(100));
     this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
-    HelixManager helixManager = getHelixManager();
     suite.startCluster();
+    HelixManager helixManager = getHelixManager();
     helixManager.connect();
 
     ExecutorService executor = Executors.newSingleThreadExecutor();
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 448a91d..904c552 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -210,12 +210,13 @@ public class GobblinTaskRunnerTest {
     Config jobConfigOverrides = 
ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
     this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides);
 
+    suite.startCluster();
+
     String zkConnectString = 
suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     String clusterName = 
suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
     //A test manager instance for observing the state of the cluster
     HelixManager helixManager = 
HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", 
InstanceType.SPECTATOR, zkConnectString);
 
-    suite.startCluster();
 
     helixManager.connect();
 
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index 57d9fd6..f38631a 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -50,16 +50,16 @@ public class HelixAssignedParticipantCheckTest {
     suite = new IntegrationBasicSuite(jobConfigOverrides);
 
     helixConfig = suite.getManagerConfig();
-    String clusterName = 
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
-    String zkConnectString = 
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    helixManager = HelixManagerFactory.getZKHelixManager(clusterName, 
"TestManager",
-        InstanceType.SPECTATOR, zkConnectString);
   }
 
   @Test (groups = {"disabledOnCI"})
   //Test disabled on Travis because cluster integration tests are generally 
flaky on Travis.
   public void testExecute() throws Exception {
     suite.startCluster();
+    String clusterName = 
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    String zkConnectString = 
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixManager = HelixManagerFactory.getZKHelixManager(clusterName, 
"TestManager",
+        InstanceType.SPECTATOR, zkConnectString);
 
     //Connect to the previously started Helix cluster
     helixManager.connect();
diff --git 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index e383060..ea321a3 100644
--- 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++ 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.helix.AccessOption;
@@ -47,6 +46,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.io.StreamUtils;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 
 
 /**
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index bc2904b..a29bd0f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -291,7 +291,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
     return longWatermarkMap;
   }
 
-  private List<KafkaPartition> getTopicPartitionsFromWorkUnit(WorkUnitState 
state) {
+  public static List<KafkaPartition> 
getTopicPartitionsFromWorkUnit(WorkUnitState state) {
     // what topic partitions are we responsible for?
     List<KafkaPartition> topicPartitions = new ArrayList<>();
 
@@ -302,7 +302,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
 
     for (int i = 0; i < numOfPartitions; ++i) {
       if (workUnit.getProp(topicNameProp, null) == null) {
-        log.warn("There's no topic.name property being set in workunt which 
could be an illegal state");
+        log.warn("There's no topic.name property being set in workunit which 
could be an illegal state");
         break;
       }
       String topicName = workUnit.getProp(topicNameProp);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
index b227636..ad80334 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
@@ -17,11 +17,17 @@
 
 package org.apache.gobblin.source.extractor.extract.kafka;
 
+import com.google.common.eventbus.EventBus;
 import java.io.IOException;
 
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.InfiniteSource;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -32,9 +38,11 @@ import com.google.common.base.Preconditions;
  * A {@link KafkaSource} to use with arbitrary {@link EventBasedExtractor}. 
Specify the extractor to use with key
  * {@link #EXTRACTOR_TYPE}.
  */
-public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
+@Slf4j
+public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> implements 
InfiniteSource<S, D> {
 
   public static final String EXTRACTOR_TYPE = 
"gobblin.source.kafka.extractorType";
+  private final EventBus eventBus = new 
EventBus(this.getClass().getSimpleName());
 
   @Override
   public Extractor<S, D> getExtractor(WorkUnitState state)
@@ -50,4 +58,16 @@ public class UniversalKafkaSource<S, D> extends 
KafkaSource<S, D> {
       throw new RuntimeException(e);
     }
   }
+
+  public void onWorkUnitUpdate(List<String> oldTaskIds, List<WorkUnit> 
newWorkUnits) {
+    if (this.eventBus != null) {
+      log.info("post workunit change event");
+      this.eventBus.post(new WorkUnitChangeEvent(oldTaskIds, newWorkUnits));
+    }
+  }
+
+  @Override
+  public EventBus getEventBus() {
+    return this.eventBus;
+  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 684a00a..aa40bce 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -17,7 +17,10 @@
 
 package org.apache.gobblin.runtime;
 
+import com.github.rholder.retry.RetryException;
+import com.google.common.eventbus.Subscribe;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.Authenticator;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -27,11 +30,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -92,7 +99,6 @@ import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
 import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -260,6 +266,27 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   }
 
   /**
+   * Handle {@link WorkUnitChangeEvent}, by default it will do nothing
+   */
+  @Subscribe
+  public void handleWorkUnitChangeEvent(WorkUnitChangeEvent 
workUnitChangeEvent)
+      throws InvocationTargetException {
+    LOG.info("start to handle workunit change event");
+    try {
+      this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+      this.addTasksToCurrentJob(workUnitChangeEvent.getNewWorkUnits());
+    } catch (Exception e) {
+      //todo: emit some event to indicate there is an error handling this 
event that may cause starvation
+      throw new InvocationTargetException(e);
+    }
+  }
+
+  protected void removeTasksFromCurrentJob(List<String> taskIdsToRemove) 
throws IOException, ExecutionException,
+                                                                               
 RetryException {}
+  protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) throws 
IOException, ExecutionException,
+                                                                            
RetryException {}
+
+  /**
    * To supporting 'gobblin.template.uri' in any types of jobLauncher, place 
this resolution as a public-static method
    * to make it accessible for all implementation of JobLauncher and 
**AzkabanJobLauncher**.
    *
@@ -427,6 +454,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         TimingEvent workUnitsCreationTimer =
             
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
         Source<?, ?> source = this.jobContext.getSource();
+        if (source instanceof InfiniteSource) {
+          ((InfiniteSource) source).getEventBus().register(this);
+        } else if (source instanceof SourceDecorator) {
+          if (((SourceDecorator<?, ?>) source).getEventBus() != null) {
+            ((SourceDecorator<?, ?>) source).getEventBus().register(this);
+          }
+        }
         WorkUnitStream workUnitStream;
         if (source instanceof WorkUnitStreamSource) {
           workUnitStream = ((WorkUnitStreamSource) 
source).getWorkunitStream(jobState);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index 4558325..f1e8928 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -17,10 +17,12 @@
 
 package org.apache.gobblin.runtime;
 
+import com.google.common.eventbus.EventBus;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.gobblin.source.InfiniteSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +74,15 @@ public class SourceDecorator<S, D> implements 
WorkUnitStreamSource<S, D>, Decora
     }
   }
 
+  public EventBus getEventBus() {
+    if (this.getDecoratedObject() instanceof InfiniteSource) {
+      return ((InfiniteSource) this.getDecoratedObject()).getEventBus();
+    } else if (this.getDecoratedObject() instanceof SourceDecorator) {
+      return ((SourceDecorator) this.getDecoratedObject()).getEventBus();
+    }
+    return null;
+  }
+
   @Override
   public WorkUnitStream getWorkunitStream(SourceState state) {
     try {
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 68f3ac7..6fb986e 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -53,7 +53,9 @@ dependencies {
   compile externalDependency.guiceMultibindings
   compile externalDependency.hadoopClientCommon
   compile externalDependency.hadoopCommon
-  compile externalDependency.helix
+  compile (externalDependency.helix) {
+    exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+  }
   compile externalDependency.hiveCommon
   compile externalDependency.httpclient
   compile externalDependency.httpcore
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 290dab3..1c01e55 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -56,7 +56,9 @@ dependencies {
   compile externalDependency.hadoopYarnClient
   compile externalDependency.avroMapredH2
   compile externalDependency.findBugsAnnotations
-  compile externalDependency.helix
+  compile (externalDependency.helix) {
+    exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+  }
 
   testCompile project(path: ':gobblin-cluster', configuration: 'tests')
   testCompile project(":gobblin-example")
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 124d0f5..1934ece 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -203,14 +203,16 @@ public class YarnServiceTestWithExpiration {
     Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 
1).isEmpty());
     Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 
10);
 
-    try {
-      Thread.sleep(20000);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    //Since it may retry to request the container and start again, so the 
number may lager than 10
-    Assert.assertTrue(this.expiredYarnService.completedContainers.size() >= 
10);
-    Assert.assertTrue(this.expiredYarnService.startErrorContainers.size() >= 
10);
+    
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
+        .assertTrue(new Predicate<Void>() {
+          @Override
+          public boolean apply(Void input) {
+            //Since it may retry to request the container and start again, so 
the number may lager than 10
+            return expiredYarnService.completedContainers.size() >= 10
+                && expiredYarnService.startErrorContainers.size() >= 10;
+          }
+        }, "Waiting for container completed");
+
   }
 
   private static class TestExpiredYarnService extends 
YarnServiceTest.TestYarnService {
@@ -240,7 +242,7 @@ public class YarnServiceTestWithExpiration {
         Thread.currentThread().interrupt();
       }
       return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
-          Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, 
Collections.emptyMap());
+          Arrays.asList("sleep", "600"), Collections.emptyMap(), null, 
Collections.emptyMap());
     }
     private class TestNMClientCallbackHandler extends 
YarnService.NMClientCallbackHandler {
       @Override
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index cfe9646..56562e5 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -67,7 +67,7 @@ ext.externalDependency = [
     "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + 
hadoopVersion,
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
     "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
-    "helix": "org.apache.helix:helix-core:0.9.4",
+    "helix": "org.apache.helix:helix-core:1.0.2",
     "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
     "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
     "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,

Reply via email to