This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f8880ed06 [GOBBLIN-1947] Send workUnitChange event when helix task 
consistently fail (#3832)
f8880ed06 is described below

commit f8880ed06079c354d5cdaa0dcd361a3c3856a5b3
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Jan 4 13:38:19 2024 -0800

    [GOBBLIN-1947] Send workUnitChange event when helix task consistently fail 
(#3832)
    
    * Send WorkUnitChangeEvent when helix task consistently fail
    
    * make lancher and scheduler correctly process work unit change event
    
    * change back pack config key
    
    * correctly process workunit stream before run
    
    * only use helix task map
    
    * update WorkUnitPreparator for job launcher
    
    * update log
    
    * use workunit id for state store
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 136 ++++++++++++++++++---
 .../gobblin/cluster/GobblinHelixJobScheduler.java  |   5 +-
 .../cluster/HelixRetriggeringJobCallableTest.java  |   5 +-
 .../extractor/extract/kafka/KafkaSource.java       |  19 ++-
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   |  14 ++-
 .../gobblin/runtime/AbstractJobLauncher.java       |  61 +++++----
 .../org/apache/gobblin/runtime/JobContext.java     |   2 +-
 .../apache/gobblin/runtime/SourceDecorator.java    |   2 +
 .../gobblin/yarn/YarnAutoScalingManager.java       |  26 +++-
 .../java/org/apache/gobblin/yarn/YarnService.java  |   1 +
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   |  30 +++--
 11 files changed, 239 insertions(+), 62 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index f0fcc258f..15fff870b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,10 +17,14 @@
 
 package org.apache.gobblin.cluster;
 
+import com.google.common.eventbus.Subscribe;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -30,6 +34,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import java.util.stream.Collectors;
+import org.apache.gobblin.runtime.SourceDecorator;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -78,6 +90,8 @@ import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.SerializationUtils;
 
+import static org.apache.gobblin.util.JobLauncherUtils.*;
+
 
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job using 
the Helix task framework.
@@ -131,7 +145,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final long workFlowExpiryTimeSeconds;
   private final long helixJobStopTimeoutSeconds;
   private final long helixWorkflowSubmissionTimeoutSeconds;
-  private Map<String, TaskConfig> workUnitToHelixConfig;
+  private Map<String, TaskConfig> helixIdTaskConfigMap;
   private Retryer<Boolean> taskRetryer;
 
   public GobblinHelixJobLauncher(Properties jobProps, final HelixManager 
helixManager, Path appWorkDir,
@@ -185,7 +199,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
 
     this.helixMetrics = helixMetrics;
-    this.workUnitToHelixConfig = new HashMap<>();
+    this.helixIdTaskConfigMap = new HashMap<>();
     this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
         .retryIfException()
         .withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
@@ -254,6 +268,76 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
+  /**
+   * The implementation of this method has the assumption that work unit 
change should never delete without adding new
+   * work units, which will cause starvation. Thus, process {@link 
WorkUnitChangeEvent} for two scenario:
+   * 1. workUnitChangeEvent only contains old tasks and no new tasks given: 
recalculate new work unit through kafka
+   * source and pack with a min container setting.
+   * 2. workUnitChangeEvent contains both valid old and new work unit: respect 
the information and directly remove
+   * old tasks and start new work units
+   *
+   * @param workUnitChangeEvent Event post by EventBus to specify old tasks to 
be removed and new tasks to be run
+   * @throws InvocationTargetException
+   */
+  @Override
+  @Subscribe
+  public void handleWorkUnitChangeEvent(WorkUnitChangeEvent 
workUnitChangeEvent)
+      throws InvocationTargetException {
+    log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+        workUnitChangeEvent.getOldTaskIds(), 
workUnitChangeEvent.getNewWorkUnits());
+    final JobState jobState = this.jobContext.getJobState();
+    List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+    // Use old task Id to recalculate new work units
+    if(workUnits == null || workUnits.isEmpty()) {
+      workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+      // If no new valid work units can be generated, dismiss the 
WorkUnitChangeEvent
+      if(workUnits == null || workUnits.isEmpty()) {
+        log.info("Not able to update work unit meaningfully, dismiss the 
WorkUnitChangeEvent");
+        return;
+      }
+    }
+
+    // Follow how AbstractJobLauncher handles work units to make sure 
consistent behaviour
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    // For streaming use case, this might be a necessary step to find dataset 
specific namespace so that each workUnit
+    // can create staging and temp directories with the correct determination 
of shard-path
+    workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
+    workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
+    workUnits = materializeWorkUnitList(workUnitStream);
+    try {
+      this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+      this.addTasksToCurrentJob(workUnits);
+    } catch (Exception e) {
+      //todo: emit some event to indicate there is an error handling this 
event that may cause starvation
+      log.error("Failed to process WorkUnitChangeEvent with old tasks {} and 
new workunits {}.",
+          workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+      throw new InvocationTargetException(e);
+    }
+  }
+
+  private List<WorkUnit> recalculateWorkUnit(List<String> oldHelixTaskIDs) {
+    JobState jobState = this.jobContext.getJobState();
+    Map<String, List<Integer>> filteredTopicPartition = new HashMap<>();
+    for(String id : oldHelixTaskIDs) {
+      WorkUnit workUnit = getWorkUnitFromStateStoreByHelixId(id);
+      String topicName = workUnit.getProp(KafkaSource.TOPIC_NAME);
+      List<Integer> partitions = filteredTopicPartition.getOrDefault(topicName,
+          new LinkedList<>());
+      
partitions.addAll(KafkaUtils.getPartitions(workUnit).stream().map(KafkaPartition::getId).collect(Collectors.toList()));
+      filteredTopicPartition.put(topicName, partitions);
+    }
+    // If a topic contains less than 2 filtered partition, it can't be further 
split so remove from map
+    filteredTopicPartition.values().removeIf(list -> list == null || 
list.size() < 2);
+    if(filteredTopicPartition.isEmpty()) {
+      return new ArrayList<>();
+    }
+    KafkaSource<?, ?> source = (KafkaSource<?, ?>) ((SourceDecorator<?, ?>) 
this.jobContext.getSource()).getSource();
+    //TODO: having a smarter way to calculate the new work unit size to 
replace the current static approach to simply double
+    int newWorkUnitSize = oldHelixTaskIDs.size() * 2;
+    return source.getWorkunitsForFilteredPartitions(jobState,
+        com.google.common.base.Optional.of(filteredTopicPartition), 
com.google.common.base.Optional.of(newWorkUnitSize));
+  }
+
   @Override
   protected void executeCancellation() {
     if (this.jobSubmitted) {
@@ -275,31 +359,31 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
-  protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) 
throws IOException, ExecutionException,
+  protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove) 
throws IOException, ExecutionException,
                                                                                
     RetryException {
     String jobName = this.jobContext.getJobId();
     try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
-      for (String workUnitId : workUnitIdsToRemove) {
+      for (String helixTaskId : helixTaskIdsToRemove) {
         taskRetryer.call(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
-            String taskId = workUnitToHelixConfig.get(workUnitId).getId();
+            String workUnitId = 
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
             boolean remove =
-                HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, 
taskId, helixTaskDriver);
+                HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, 
helixTaskId, helixTaskDriver);
             if (remove) {
-              log.info(String.format("Removed helix task %s with gobblin task 
id  %s from helix job %s:%s ", taskId,
+              log.info(String.format("Removed helix task %s with gobblin task 
id %s from helix job %s:%s ", helixTaskId,
                   workUnitId, helixWorkFlowName, jobName));
             } else {
               throw new IOException(
-                  String.format("Cannot remove task %s from helix job %s:%s", 
workUnitId,
+                  String.format("Cannot remove Helix task %s from helix job 
%s:%s", helixTaskId,
                       helixWorkFlowName, jobName));
             }
             return true;
           }
         });
-        deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
-        log.info(String.format("remove task state for %s in state store", 
workUnitId));
-        this.workUnitToHelixConfig.remove(workUnitId);
+        deleteWorkUnitFromStateStoreByHelixId(helixTaskId, stateSerDeRunner);
+        log.info(String.format("Removed task state for Helix task %s in state 
store", helixTaskId));
+        this.helixIdTaskConfigMap.remove(helixTaskId);
       }
     }
   }
@@ -513,7 +597,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
     
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, 
"true");
     TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
-    workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
+    helixIdTaskConfigMap.put(taskConfig.getId(), taskConfig);
     return taskConfig;
   }
 
@@ -526,12 +610,36 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit, 
stateSerDeRunner));
   }
 
+  /**
+   * get a single {@link WorkUnit} (flattened) from state store.
+   */
+  private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
+    String workUnitFilePath =
+        
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+    final StateStore stateStore;
+    Path workUnitFile = new Path(workUnitFilePath);
+    String workUnitId = 
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
+    final String fileName = workUnitFile.getName();
+    final String storeName = workUnitFile.getParent().getName();
+    if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
+      stateStore = stateStores.getMwuStateStore();
+    } else {
+      stateStore = stateStores.getWuStateStore();
+    }
+    try {
+      return (WorkUnit) stateStore.get(storeName, fileName, workUnitId);
+    } catch (IOException ioException) {
+      log.error("Failed to fetch workUnit for helix task {} from path {}", 
helixTaskId, workUnitFilePath);
+    }
+    return null;
+  }
+
   /**
    * Delete a single {@link WorkUnit} (flattened) from state store.
    */
-  private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner 
stateSerDeRunner) {
+  private void deleteWorkUnitFromStateStoreByHelixId(String helixTaskId, 
ParallelRunner stateSerDeRunner) {
     String workUnitFilePath =
-        
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+        
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
     Path workUnitFile = new Path(workUnitFilePath);
     final String fileName = workUnitFile.getName();
     final String storeName = workUnitFile.getParent().getName();
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index d30554d2b..4049ee3d1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -275,13 +275,14 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
     Properties combinedProps = new Properties();
     combinedProps.putAll(properties);
     combinedProps.putAll(jobProps);
-
-    return new GobblinHelixJobLauncher(combinedProps,
+    GobblinHelixJobLauncher gobblinHelixJobLauncher = new 
GobblinHelixJobLauncher(combinedProps,
         this.jobHelixManager,
         this.appWorkDir,
         this.metadataTags,
         this.jobRunningMap,
         Optional.of(this.helixMetrics));
+    this.eventBus.register(gobblinHelixJobLauncher);
+    return gobblinHelixJobLauncher;
   }
 
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener 
jobListener) {
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
index 779e7dd0d..f2572d18f 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
+import com.google.common.eventbus.EventBus;
 import java.io.File;
 import java.util.Optional;
 import java.util.Properties;
@@ -59,8 +60,8 @@ public class HelixRetriggeringJobCallableTest {
     MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     SchedulerService schedulerService = new SchedulerService(new Properties());
     Path appWorkDir = new Path(TMP_DIR);
-    GobblinHelixJobScheduler jobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), 
Optional.empty(), null,
-        appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+    GobblinHelixJobScheduler jobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), 
Optional.empty(),
+        new EventBus("Test"), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
     GobblinHelixJobLauncher jobLauncher = 
HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler, 
getDummyJob());
     String jobId = jobLauncher.getJobId();
     Assert.assertNotNull(jobId);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 80ef8c09d..3eb096765 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -244,10 +244,17 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
 
       Collection<KafkaTopic> topics;
       if(filteredTopicPartition.isPresent()) {
-        // If filteredTopicPartition present, use it to construct the 
whitelist pattern while leave blacklist empty
-        topics = 
this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
-            
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
+        if(filteredTopicPartition.get().isEmpty()) {
+          // return an empty list as filteredTopicPartition is present but 
contains no valid entry
+          return new ArrayList<>();
+        } else {
+          // If filteredTopicPartition present, use it to construct the 
whitelist pattern while leave blacklist empty
+          topics = this.kafkaConsumerClient.get()
+              .getFilteredTopics(Collections.emptyList(),
+                  
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
+        }
       } else {
+        // get topics based on job level config
         topics = getValidTopics(getFilteredTopics(state), state);
       }
       this.topicsToProcess = 
topics.stream().map(KafkaTopic::getName).collect(toSet());
@@ -263,6 +270,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
 
       int numOfThreads = 
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
           
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
+      // No need to allocate more thread than the topic size, but minimum 
should 1
+      numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
       ExecutorService threadPool =
           Executors.newFixedThreadPool(numOfThreads, 
ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
 
@@ -277,7 +286,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
       Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
 
       for (KafkaTopic topic : topics) {
-        LOG.info("Discovered topic " + topic);
+        LOG.info("Discovered topic {} with {} number of partitions", 
topic.getName(), topic.getPartitions().size());
         if (topic.getTopicSpecificState().isPresent()) {
           topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new 
State())
               .addAllIfNotExist(topic.getTopicSpecificState().get());
@@ -343,6 +352,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   protected void populateClientPool(int count,
       GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
       Config config) {
+    // Clear the pool first as clients within may already be close
+    kafkaConsumerClientPool.clear();
     for (int i = 0; i < count; i++) {
       kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
     }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 9c22d47cc..764eeae09 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.hadoop.fs.Path;
 
@@ -135,6 +136,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
   private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
 
+  // id to append to the task output directory to make it unique to avoid 
multiple flush publishers attempting to move
+  // the same file. Make it static and atomic as during runtime packing can 
happen multiple times simultaneously
+  private static AtomicInteger uniqueId = new AtomicInteger(0);
+
   private final long packingStartTimeMillis;
   private final double minimumContainerCapacity;
   private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
@@ -305,7 +310,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
   private Double getDefaultWorkUnitSize() {
     return 
state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
-        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / 
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, 
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
+        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
+        state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, 
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
   }
 
   /**
@@ -339,9 +345,6 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
     if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
       List<WorkUnit> indexedWorkUnitList = new ArrayList<>();
 
-      // id to append to the task output directory to make it unique to avoid 
multiple flush publishers
-      // attempting to move the same file.
-      int uniqueId = 0;
       for (MultiWorkUnit mwu : multiWorkUnits) {
         // Select a sample WU.
         WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
@@ -355,7 +358,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
         // Need to make the task output directory unique to file move 
conflicts in the flush publisher.
         String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
-        indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(outputDir, Integer.toString(uniqueId++)));
+        indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
+            new Path(outputDir, Integer.toString(uniqueId.getAndIncrement())));
         indexedWorkUnitList.add(indexedWorkUnit);
       }
       return indexedWorkUnitList;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index bf86e628e..fbd4294de 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,6 +147,9 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   // This contains all job context information
   protected final JobContext jobContext;
 
+  // Helper to prepare WorkUnit with necessary information. This final object 
can make sure the uniqueness of task IDs
+  protected final WorkUnitPreparator workUnitPreparator;
+
   // This (optional) JobLock is used to prevent the next scheduled run
   // of the job from starting if the current run has not finished yet
   protected Optional<JobLock> jobLockOptional = Optional.absent();
@@ -171,6 +176,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   protected final EventSubmitter eventSubmitter;
 
   // This is for dispatching events related to job launching and execution to 
registered subscribers
+  @Getter
   protected final EventBus eventBus = new 
EventBus(AbstractJobLauncher.class.getSimpleName());
 
   // A list of JobListeners that will be injected into the user provided 
JobListener
@@ -183,6 +189,9 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   protected final GobblinJobMetricReporter gobblinJobMetricsReporter;
 
+  protected Boolean canCleanUpStagingData = false;
+  protected DestinationDatasetHandlerService destDatasetHandlerService;
+
   public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> 
metadataTags)
       throws Exception {
     this(jobProps, metadataTags, null);
@@ -221,6 +230,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
       this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker, 
troubleshooter.getIssueRepository());
       this.eventBus.register(this.jobContext);
+      this.workUnitPreparator = new 
WorkUnitPreparator(this.jobContext.getJobId());
 
       this.cancellationExecutor = Executors.newSingleThreadExecutor(
           ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG), 
Optional.of("CancellationExecutor")));
@@ -507,9 +517,10 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         }
 
         // Perform work needed before writing is done
-        Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
-        workUnitStream = closer.register(new 
DestinationDatasetHandlerService(jobState, canCleanUp, this.eventSubmitter))
-            .executeHandlers(workUnitStream);
+        this.canCleanUpStagingData = 
this.canCleanStagingData(this.jobContext.getJobState());
+        this.destDatasetHandlerService = new 
DestinationDatasetHandlerService(jobState, canCleanUpStagingData, 
this.eventSubmitter);
+        closer.register(this.destDatasetHandlerService);
+        workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
 
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
@@ -541,19 +552,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
           TimingEvent workUnitsPreparationTimer =
               
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
-          // Add task ids
-          workUnitStream = prepareWorkUnits(workUnitStream, jobState);
-          // Remove skipped workUnits from the list of work units to execute.
-          workUnitStream = workUnitStream.filter(new 
SkippedWorkUnitsFilter(jobState));
-          // Add surviving tasks to jobState
-          workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach() 
{
-            @Override
-            public void forWorkUnit(WorkUnit workUnit) {
-              jobState.incrementTaskCount();
-              jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, 
jobState)));
-            }
-          });
-
+          workUnitStream = processWorkUnitStream(workUnitStream, jobState);
           // If it is a streaming source, workunits cannot be counted
           this.jobContext.getJobState().setProp(NUM_WORKUNITS,
               workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
@@ -711,6 +710,26 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     }
   }
 
+  protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream, 
DestinationDatasetHandlerService datasetHandlerService){
+    return datasetHandlerService.executeHandlers(workUnitStream);
+  }
+
+  protected WorkUnitStream processWorkUnitStream(WorkUnitStream 
workUnitStream, JobState jobState) {
+    // Add task ids
+    workUnitStream = prepareWorkUnits(workUnitStream);
+    // Remove skipped workUnits from the list of work units to execute.
+    workUnitStream = workUnitStream.filter(new 
SkippedWorkUnitsFilter(jobState));
+    // Add surviving tasks to jobState
+    workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach() {
+      @Override
+      public void forWorkUnit(WorkUnit workUnit) {
+        jobState.incrementTaskCount();
+        jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, 
jobState)));
+      }
+    });
+    return workUnitStream;
+  }
+
   /**
    * Subclasses can override this method to do whatever processing on the 
{@link TaskState}s,
    * e.g., aggregate task-level metrics into job-level metrics.
@@ -817,7 +836,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   /**
    * Materialize a {@link WorkUnitStream} into an in-memory list. Note that 
infinite work unit streams cannot be materialized.
    */
-  private List<WorkUnit> materializeWorkUnitList(WorkUnitStream 
workUnitStream) {
+  protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream 
workUnitStream) {
     if (!workUnitStream.isFiniteStream()) {
       throw new UnsupportedOperationException("Cannot materialize an infinite 
work unit stream.");
     }
@@ -888,8 +907,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   /**
    * Prepare the flattened {@link WorkUnit}s for execution by populating the 
job and task IDs.
    */
-  private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState 
jobState) {
-    return workUnits.transform(new 
WorkUnitPreparator(this.jobContext.getJobId()));
+  private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
+    return workUnits.transform(workUnitPreparator);
   }
 
   private static abstract class MultiWorkUnitForEach implements 
Function<WorkUnit, WorkUnit> {
@@ -912,13 +931,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   @RequiredArgsConstructor
   private static class WorkUnitPreparator extends MultiWorkUnitForEach {
-    private int taskIdSequence = 0;
+    private final AtomicInteger taskIdSequence = new AtomicInteger(0);
     private final String jobId;
 
     @Override
     protected void forWorkUnit(WorkUnit workUnit) {
       workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, this.jobId);
-      String taskId = JobLauncherUtils.newTaskId(this.jobId, 
this.taskIdSequence++);
+      String taskId = JobLauncherUtils.newTaskId(this.jobId, 
taskIdSequence.getAndIncrement());
       workUnit.setId(taskId);
       workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);
       workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY, 
Long.toString(Id.Task.parse(taskId).getSequence()));
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index c2bdc592d..a7b7fd0f5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -302,7 +302,7 @@ public class JobContext implements Closeable {
    *
    * @return an instance of the {@link Source} class specified in the job 
configuration
    */
-  Source<?, ?> getSource() {
+  public Source<?, ?> getSource() {
     return this.source;
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index f1e8928be..2dc167c0a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import lombok.Getter;
 import org.apache.gobblin.source.InfiniteSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
 public class SourceDecorator<S, D> implements WorkUnitStreamSource<S, D>, 
Decorator {
   private static final Logger LOG = 
LoggerFactory.getLogger(SourceDecorator.class);
 
+  @Getter
   private final Source<S, D> source;
   private final String jobId;
   private final Logger logger;
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index c447af99e..21fc47c4a 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.yarn;
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.compress.utils.Sets;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -71,7 +73,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final String AUTO_SCALING_PREFIX = 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
   private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
       AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
-  private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
+  private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX 
+ "taskAttemptsThreshold";
+  private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20;
+  private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = 
AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold";
+  private final boolean DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = 
false;
   private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
   // Only one container will be requested for each N partitions of work
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
@@ -85,6 +90,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
 
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+  private int taskAttemptsThreshold;
+  private final boolean splitWorkUnitReachThreshold;
 
   private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + 
"windowSize";
 
@@ -125,6 +132,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
         GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
     this.defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
     this.defaultContainerCores = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+    this.taskAttemptsThreshold = ConfigUtils.getInt(this.config, 
TASK_NUMBER_OF_ATTEMPTS_THRESHOLD,
+        DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD);
+    this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config, 
SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD,
+        DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD);
   }
 
   @Override
@@ -139,7 +150,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
             this.yarnService, this.partitionsPerContainer, 
this.overProvisionFactor,
             this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
-            this.defaultContainerMemoryMbs, this.defaultContainerCores),
+            this.defaultContainerMemoryMbs, this.defaultContainerCores, 
this.taskAttemptsThreshold, this.splitWorkUnitReachThreshold),
         initialDelay, scheduleInterval, TimeUnit.SECONDS);
   }
 
@@ -166,6 +177,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     private final String defaultHelixInstanceTags;
     private final int defaultContainerMemoryMbs;
     private final int defaultContainerCores;
+    private final int taskAttemptsThreshold;
+    private final boolean splitWorkUnitReachThreshold;
 
     /**
      * A static map that keep track of an idle instance and its latest 
beginning idle time.
@@ -185,10 +198,17 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     }
 
     private String getInuseParticipantForHelixPartition(JobContext jobContext, 
int partition) {
-      if (jobContext.getPartitionNumAttempts(partition) > 
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
+      if (jobContext.getPartitionNumAttempts(partition) > 
taskAttemptsThreshold) {
         log.warn("Helix task {} has been retried for {} times, please check 
the config to see how we can handle this task better",
             jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionNumAttempts(partition));
+        if(splitWorkUnitReachThreshold) {
+          String helixTaskID = jobContext.getTaskIdForPartition(partition);
+          log.info("Sending WorkUnitChangeEvent to split helix task:{}", 
helixTaskID);
+          this.yarnService.getEventBus().post(new WorkUnitChangeEvent(
+              Collections.singletonList(helixTaskID), null));
+        }
       }
+
       if 
(!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) {
         return jobContext.getAssignedParticipant(partition);
       }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index e1da50d94..370b8ba87 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -134,6 +134,7 @@ public class YarnService extends AbstractIdleService {
 
   private final Config config;
 
+  @Getter
   private final EventBus eventBus;
 
   private final Configuration yarnConfiguration;
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 10563c2bb..d21d530d7 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -78,7 +78,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
     ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
@@ -108,7 +109,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
 
@@ -145,7 +147,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
 
@@ -184,7 +187,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
 
@@ -211,7 +215,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
 
@@ -234,7 +239,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.2, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.2, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable1.run();
 
@@ -246,7 +252,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable2.run();
 
@@ -258,7 +265,8 @@ public class YarnAutoScalingManagerTest {
     Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            6.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            6.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable3.run();
 
@@ -384,7 +392,8 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+            defaultContainerCores, 20, false);
 
     runnable.run();
 
@@ -469,7 +478,8 @@ public class YarnAutoScalingManagerTest {
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService 
yarnService, int partitionsPerContainer,
         HelixDataAccessor helixDataAccessor) {
       super(taskDriver, yarnService, partitionsPerContainer, 1.0,
-          noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+          noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
+          defaultContainerCores, 20, false);
     }
 
     @Override


Reply via email to