This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f8880ed06 [GOBBLIN-1947] Send workUnitChange event when helix task
consistently fail (#3832)
f8880ed06 is described below
commit f8880ed06079c354d5cdaa0dcd361a3c3856a5b3
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Jan 4 13:38:19 2024 -0800
[GOBBLIN-1947] Send workUnitChange event when helix task consistently fail
(#3832)
* Send WorkUnitChangeEvent when helix task consistently fail
* make lancher and scheduler correctly process work unit change event
* change back pack config key
* correctly process workunit stream before run
* only use helix task map
* update WorkUnitPreparator for job launcher
* update log
* use workunit id for state store
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 136 ++++++++++++++++++---
.../gobblin/cluster/GobblinHelixJobScheduler.java | 5 +-
.../cluster/HelixRetriggeringJobCallableTest.java | 5 +-
.../extractor/extract/kafka/KafkaSource.java | 19 ++-
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 14 ++-
.../gobblin/runtime/AbstractJobLauncher.java | 61 +++++----
.../org/apache/gobblin/runtime/JobContext.java | 2 +-
.../apache/gobblin/runtime/SourceDecorator.java | 2 +
.../gobblin/yarn/YarnAutoScalingManager.java | 26 +++-
.../java/org/apache/gobblin/yarn/YarnService.java | 1 +
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 30 +++--
11 files changed, 239 insertions(+), 62 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index f0fcc258f..15fff870b 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,10 +17,14 @@
package org.apache.gobblin.cluster;
+import com.google.common.eventbus.Subscribe;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -30,6 +34,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.gobblin.runtime.SourceDecorator;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -78,6 +90,8 @@ import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.SerializationUtils;
+import static org.apache.gobblin.util.JobLauncherUtils.*;
+
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using
the Helix task framework.
@@ -131,7 +145,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
private final long workFlowExpiryTimeSeconds;
private final long helixJobStopTimeoutSeconds;
private final long helixWorkflowSubmissionTimeoutSeconds;
- private Map<String, TaskConfig> workUnitToHelixConfig;
+ private Map<String, TaskConfig> helixIdTaskConfigMap;
private Retryer<Boolean> taskRetryer;
public GobblinHelixJobLauncher(Properties jobProps, final HelixManager
helixManager, Path appWorkDir,
@@ -185,7 +199,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.stateStores.getTaskStateStore(), this.outputTaskStateDir,
this.getIssueRepository());
this.helixMetrics = helixMetrics;
- this.workUnitToHelixConfig = new HashMap<>();
+ this.helixIdTaskConfigMap = new HashMap<>();
this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
@@ -254,6 +268,76 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
+ workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+ // If no new valid work units can be generated, dismiss the
WorkUnitChangeEvent
+ if(workUnits == null || workUnits.isEmpty()) {
+ log.info("Not able to update work unit meaningfully, dismiss the
WorkUnitChangeEvent");
+ return;
+ }
+ }
+
+ // Follow how AbstractJobLauncher handles work units to make sure
consistent behaviour
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ // For streaming use case, this might be a necessary step to find dataset
specific namespace so that each workUnit
+ // can create staging and temp directories with the correct determination
of shard-path
+ workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
+ workUnits = materializeWorkUnitList(workUnitStream);
+ try {
+ this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTasksToCurrentJob(workUnits);
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ log.error("Failed to process WorkUnitChangeEvent with old tasks {} and
new workunits {}.",
+ workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+ throw new InvocationTargetException(e);
+ }
+ }
+
+ private List<WorkUnit> recalculateWorkUnit(List<String> oldHelixTaskIDs) {
+ JobState jobState = this.jobContext.getJobState();
+ Map<String, List<Integer>> filteredTopicPartition = new HashMap<>();
+ for(String id : oldHelixTaskIDs) {
+ WorkUnit workUnit = getWorkUnitFromStateStoreByHelixId(id);
+ String topicName = workUnit.getProp(KafkaSource.TOPIC_NAME);
+ List<Integer> partitions = filteredTopicPartition.getOrDefault(topicName,
+ new LinkedList<>());
+
partitions.addAll(KafkaUtils.getPartitions(workUnit).stream().map(KafkaPartition::getId).collect(Collectors.toList()));
+ filteredTopicPartition.put(topicName, partitions);
+ }
+ // If a topic contains less than 2 filtered partition, it can't be further
split so remove from map
+ filteredTopicPartition.values().removeIf(list -> list == null ||
list.size() < 2);
+ if(filteredTopicPartition.isEmpty()) {
+ return new ArrayList<>();
+ }
+ KafkaSource<?, ?> source = (KafkaSource<?, ?>) ((SourceDecorator<?, ?>)
this.jobContext.getSource()).getSource();
+ //TODO: having a smarter way to calculate the new work unit size to
replace the current static approach to simply double
+ int newWorkUnitSize = oldHelixTaskIDs.size() * 2;
+ return source.getWorkunitsForFilteredPartitions(jobState,
+ com.google.common.base.Optional.of(filteredTopicPartition),
com.google.common.base.Optional.of(newWorkUnitSize));
+ }
+
@Override
protected void executeCancellation() {
if (this.jobSubmitted) {
@@ -275,31 +359,31 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
}
- protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove)
throws IOException, ExecutionException,
+ protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove)
throws IOException, ExecutionException,
RetryException {
String jobName = this.jobContext.getJobId();
try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
- for (String workUnitId : workUnitIdsToRemove) {
+ for (String helixTaskId : helixTaskIdsToRemove) {
taskRetryer.call(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- String taskId = workUnitToHelixConfig.get(workUnitId).getId();
+ String workUnitId =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
boolean remove =
- HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName,
taskId, helixTaskDriver);
+ HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName,
helixTaskId, helixTaskDriver);
if (remove) {
- log.info(String.format("Removed helix task %s with gobblin task
id %s from helix job %s:%s ", taskId,
+ log.info(String.format("Removed helix task %s with gobblin task
id %s from helix job %s:%s ", helixTaskId,
workUnitId, helixWorkFlowName, jobName));
} else {
throw new IOException(
- String.format("Cannot remove task %s from helix job %s:%s",
workUnitId,
+ String.format("Cannot remove Helix task %s from helix job
%s:%s", helixTaskId,
helixWorkFlowName, jobName));
}
return true;
}
});
- deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
- log.info(String.format("remove task state for %s in state store",
workUnitId));
- this.workUnitToHelixConfig.remove(workUnitId);
+ deleteWorkUnitFromStateStoreByHelixId(helixTaskId, stateSerDeRunner);
+ log.info(String.format("Removed task state for Helix task %s in state
store", helixTaskId));
+ this.helixIdTaskConfigMap.remove(helixTaskId);
}
}
}
@@ -513,7 +597,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY,
"true");
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
- workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
+ helixIdTaskConfigMap.put(taskConfig.getId(), taskConfig);
return taskConfig;
}
@@ -526,12 +610,36 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit,
stateSerDeRunner));
}
+ /**
+ * get a single {@link WorkUnit} (flattened) from state store.
+ */
+ private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
+ String workUnitFilePath =
+
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+ final StateStore stateStore;
+ Path workUnitFile = new Path(workUnitFilePath);
+ String workUnitId =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
+ final String fileName = workUnitFile.getName();
+ final String storeName = workUnitFile.getParent().getName();
+ if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
+ stateStore = stateStores.getMwuStateStore();
+ } else {
+ stateStore = stateStores.getWuStateStore();
+ }
+ try {
+ return (WorkUnit) stateStore.get(storeName, fileName, workUnitId);
+ } catch (IOException ioException) {
+ log.error("Failed to fetch workUnit for helix task {} from path {}",
helixTaskId, workUnitFilePath);
+ }
+ return null;
+ }
+
/**
* Delete a single {@link WorkUnit} (flattened) from state store.
*/
- private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner
stateSerDeRunner) {
+ private void deleteWorkUnitFromStateStoreByHelixId(String helixTaskId,
ParallelRunner stateSerDeRunner) {
String workUnitFilePath =
-
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index d30554d2b..4049ee3d1 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -275,13 +275,14 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
Properties combinedProps = new Properties();
combinedProps.putAll(properties);
combinedProps.putAll(jobProps);
-
- return new GobblinHelixJobLauncher(combinedProps,
+ GobblinHelixJobLauncher gobblinHelixJobLauncher = new
GobblinHelixJobLauncher(combinedProps,
this.jobHelixManager,
this.appWorkDir,
this.metadataTags,
this.jobRunningMap,
Optional.of(this.helixMetrics));
+ this.eventBus.register(gobblinHelixJobLauncher);
+ return gobblinHelixJobLauncher;
}
public Future<?> scheduleJobImmediately(Properties jobProps, JobListener
jobListener) {
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
index 779e7dd0d..f2572d18f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.cluster;
+import com.google.common.eventbus.EventBus;
import java.io.File;
import java.util.Optional;
import java.util.Properties;
@@ -59,8 +60,8 @@ public class HelixRetriggeringJobCallableTest {
MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
SchedulerService schedulerService = new SchedulerService(new Properties());
Path appWorkDir = new Path(TMP_DIR);
- GobblinHelixJobScheduler jobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(),
Optional.empty(), null,
- appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+ GobblinHelixJobScheduler jobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(),
Optional.empty(),
+ new EventBus("Test"), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
GobblinHelixJobLauncher jobLauncher =
HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler,
getDummyJob());
String jobId = jobLauncher.getJobId();
Assert.assertNotNull(jobId);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 80ef8c09d..3eb096765 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -244,10 +244,17 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
- // If filteredTopicPartition present, use it to construct the
whitelist pattern while leave blacklist empty
- topics =
this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
-
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
+ if(filteredTopicPartition.get().isEmpty()) {
+ // return an empty list as filteredTopicPartition is present but
contains no valid entry
+ return new ArrayList<>();
+ } else {
+ // If filteredTopicPartition present, use it to construct the
whitelist pattern while leave blacklist empty
+ topics = this.kafkaConsumerClient.get()
+ .getFilteredTopics(Collections.emptyList(),
+
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
+ }
} else {
+ // get topics based on job level config
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess =
topics.stream().map(KafkaTopic::getName).collect(toSet());
@@ -263,6 +270,8 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
int numOfThreads =
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
+ // No need to allocate more thread than the topic size, but minimum
should 1
+ numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads,
ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
@@ -277,7 +286,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
- LOG.info("Discovered topic " + topic);
+ LOG.info("Discovered topic {} with {} number of partitions",
topic.getName(), topic.getPartitions().size());
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new
State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
@@ -343,6 +352,8 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
protected void populateClientPool(int count,
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
Config config) {
+ // Clear the pool first as clients within may already be close
+ kafkaConsumerClientPool.clear();
for (int i = 0; i < count; i++) {
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 9c22d47cc..764eeae09 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;
@@ -135,6 +136,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";
+ // id to append to the task output directory to make it unique to avoid
multiple flush publishers attempting to move
+ // the same file. Make it static and atomic as during runtime packing can
happen multiple times simultaneously
+ private static AtomicInteger uniqueId = new AtomicInteger(0);
+
private final long packingStartTimeMillis;
private final double minimumContainerCapacity;
private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
@@ -305,7 +310,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
private Double getDefaultWorkUnitSize() {
return
state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
- KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY,
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
+ KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
+ state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY,
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}
/**
@@ -339,9 +345,6 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
List<WorkUnit> indexedWorkUnitList = new ArrayList<>();
- // id to append to the task output directory to make it unique to avoid
multiple flush publishers
- // attempting to move the same file.
- int uniqueId = 0;
for (MultiWorkUnit mwu : multiWorkUnits) {
// Select a sample WU.
WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
@@ -355,7 +358,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
// Need to make the task output directory unique to file move
conflicts in the flush publisher.
String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
- indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new
Path(outputDir, Integer.toString(uniqueId++)));
+ indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
+ new Path(outputDir, Integer.toString(uniqueId.getAndIncrement())));
indexedWorkUnitList.add(indexedWorkUnit);
}
return indexedWorkUnitList;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index bf86e628e..fbd4294de 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,6 +147,9 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
// This contains all job context information
protected final JobContext jobContext;
+ // Helper to prepare WorkUnit with necessary information. This final object
can make sure the uniqueness of task IDs
+ protected final WorkUnitPreparator workUnitPreparator;
+
// This (optional) JobLock is used to prevent the next scheduled run
// of the job from starting if the current run has not finished yet
protected Optional<JobLock> jobLockOptional = Optional.absent();
@@ -171,6 +176,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
protected final EventSubmitter eventSubmitter;
// This is for dispatching events related to job launching and execution to
registered subscribers
+ @Getter
protected final EventBus eventBus = new
EventBus(AbstractJobLauncher.class.getSimpleName());
// A list of JobListeners that will be injected into the user provided
JobListener
@@ -183,6 +189,9 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
protected final GobblinJobMetricReporter gobblinJobMetricsReporter;
+ protected Boolean canCleanUpStagingData = false;
+ protected DestinationDatasetHandlerService destDatasetHandlerService;
+
public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>>
metadataTags)
throws Exception {
this(jobProps, metadataTags, null);
@@ -221,6 +230,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker,
troubleshooter.getIssueRepository());
this.eventBus.register(this.jobContext);
+ this.workUnitPreparator = new
WorkUnitPreparator(this.jobContext.getJobId());
this.cancellationExecutor = Executors.newSingleThreadExecutor(
ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG),
Optional.of("CancellationExecutor")));
@@ -507,9 +517,10 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
// Perform work needed before writing is done
- Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
- workUnitStream = closer.register(new
DestinationDatasetHandlerService(jobState, canCleanUp, this.eventSubmitter))
- .executeHandlers(workUnitStream);
+ this.canCleanUpStagingData =
this.canCleanStagingData(this.jobContext.getJobState());
+ this.destDatasetHandlerService = new
DestinationDatasetHandlerService(jobState, canCleanUpStagingData,
this.eventSubmitter);
+ closer.register(this.destDatasetHandlerService);
+ workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
@@ -541,19 +552,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
TimingEvent workUnitsPreparationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
- // Add task ids
- workUnitStream = prepareWorkUnits(workUnitStream, jobState);
- // Remove skipped workUnits from the list of work units to execute.
- workUnitStream = workUnitStream.filter(new
SkippedWorkUnitsFilter(jobState));
- // Add surviving tasks to jobState
- workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach()
{
- @Override
- public void forWorkUnit(WorkUnit workUnit) {
- jobState.incrementTaskCount();
- jobState.addTaskState(new TaskState(new WorkUnitState(workUnit,
jobState)));
- }
- });
-
+ workUnitStream = processWorkUnitStream(workUnitStream, jobState);
// If it is a streaming source, workunits cannot be counted
this.jobContext.getJobState().setProp(NUM_WORKUNITS,
workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
@@ -711,6 +710,26 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
+ protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream,
DestinationDatasetHandlerService datasetHandlerService){
+ return datasetHandlerService.executeHandlers(workUnitStream);
+ }
+
+ protected WorkUnitStream processWorkUnitStream(WorkUnitStream
workUnitStream, JobState jobState) {
+ // Add task ids
+ workUnitStream = prepareWorkUnits(workUnitStream);
+ // Remove skipped workUnits from the list of work units to execute.
+ workUnitStream = workUnitStream.filter(new
SkippedWorkUnitsFilter(jobState));
+ // Add surviving tasks to jobState
+ workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach() {
+ @Override
+ public void forWorkUnit(WorkUnit workUnit) {
+ jobState.incrementTaskCount();
+ jobState.addTaskState(new TaskState(new WorkUnitState(workUnit,
jobState)));
+ }
+ });
+ return workUnitStream;
+ }
+
/**
* Subclasses can override this method to do whatever processing on the
{@link TaskState}s,
* e.g., aggregate task-level metrics into job-level metrics.
@@ -817,7 +836,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
/**
* Materialize a {@link WorkUnitStream} into an in-memory list. Note that
infinite work unit streams cannot be materialized.
*/
- private List<WorkUnit> materializeWorkUnitList(WorkUnitStream
workUnitStream) {
+ protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream
workUnitStream) {
if (!workUnitStream.isFiniteStream()) {
throw new UnsupportedOperationException("Cannot materialize an infinite
work unit stream.");
}
@@ -888,8 +907,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
/**
* Prepare the flattened {@link WorkUnit}s for execution by populating the
job and task IDs.
*/
- private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState
jobState) {
- return workUnits.transform(new
WorkUnitPreparator(this.jobContext.getJobId()));
+ private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
+ return workUnits.transform(workUnitPreparator);
}
private static abstract class MultiWorkUnitForEach implements
Function<WorkUnit, WorkUnit> {
@@ -912,13 +931,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
@RequiredArgsConstructor
private static class WorkUnitPreparator extends MultiWorkUnitForEach {
- private int taskIdSequence = 0;
+ private final AtomicInteger taskIdSequence = new AtomicInteger(0);
private final String jobId;
@Override
protected void forWorkUnit(WorkUnit workUnit) {
workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, this.jobId);
- String taskId = JobLauncherUtils.newTaskId(this.jobId,
this.taskIdSequence++);
+ String taskId = JobLauncherUtils.newTaskId(this.jobId,
taskIdSequence.getAndIncrement());
workUnit.setId(taskId);
workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);
workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY,
Long.toString(Id.Task.parse(taskId).getSequence()));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index c2bdc592d..a7b7fd0f5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -302,7 +302,7 @@ public class JobContext implements Closeable {
*
* @return an instance of the {@link Source} class specified in the job
configuration
*/
- Source<?, ?> getSource() {
+ public Source<?, ?> getSource() {
return this.source;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index f1e8928be..2dc167c0a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import lombok.Getter;
import org.apache.gobblin.source.InfiniteSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
public class SourceDecorator<S, D> implements WorkUnitStreamSource<S, D>,
Decorator {
private static final Logger LOG =
LoggerFactory.getLogger(SourceDecorator.class);
+ @Getter
private final Source<S, D> source;
private final String jobId;
private final Logger logger;
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index c447af99e..21fc47c4a 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.yarn;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.compress.utils.Sets;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -71,7 +73,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String AUTO_SCALING_PREFIX =
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
- private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
+ private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX
+ "taskAttemptsThreshold";
+ private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20;
+ private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD =
AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold";
+ private final boolean DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD =
false;
private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
// Only one container will be requested for each N partitions of work
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
@@ -85,6 +90,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+ private int taskAttemptsThreshold;
+ private final boolean splitWorkUnitReachThreshold;
private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX +
"windowSize";
@@ -125,6 +132,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
this.defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
this.defaultContainerCores =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+ this.taskAttemptsThreshold = ConfigUtils.getInt(this.config,
TASK_NUMBER_OF_ATTEMPTS_THRESHOLD,
+ DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD);
+ this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config,
SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD,
+ DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD);
}
@Override
@@ -139,7 +150,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
this.yarnService, this.partitionsPerContainer,
this.overProvisionFactor,
this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
- this.defaultContainerMemoryMbs, this.defaultContainerCores),
+ this.defaultContainerMemoryMbs, this.defaultContainerCores,
this.taskAttemptsThreshold, this.splitWorkUnitReachThreshold),
initialDelay, scheduleInterval, TimeUnit.SECONDS);
}
@@ -166,6 +177,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String defaultHelixInstanceTags;
private final int defaultContainerMemoryMbs;
private final int defaultContainerCores;
+ private final int taskAttemptsThreshold;
+ private final boolean splitWorkUnitReachThreshold;
/**
* A static map that keep track of an idle instance and its latest
beginning idle time.
@@ -185,10 +198,17 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
}
private String getInuseParticipantForHelixPartition(JobContext jobContext,
int partition) {
- if (jobContext.getPartitionNumAttempts(partition) >
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
+ if (jobContext.getPartitionNumAttempts(partition) >
taskAttemptsThreshold) {
log.warn("Helix task {} has been retried for {} times, please check
the config to see how we can handle this task better",
jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionNumAttempts(partition));
+ if(splitWorkUnitReachThreshold) {
+ String helixTaskID = jobContext.getTaskIdForPartition(partition);
+ log.info("Sending WorkUnitChangeEvent to split helix task:{}",
helixTaskID);
+ this.yarnService.getEventBus().post(new WorkUnitChangeEvent(
+ Collections.singletonList(helixTaskID), null));
+ }
}
+
if
(!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) {
return jobContext.getAssignedParticipant(partition);
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index e1da50d94..370b8ba87 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -134,6 +134,7 @@ public class YarnService extends AbstractIdleService {
private final Config config;
+ @Getter
private final EventBus eventBus;
private final Configuration yarnConfiguration;
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 10563c2bb..d21d530d7 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -78,7 +78,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
@@ -108,7 +109,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
@@ -145,7 +147,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
@@ -184,7 +187,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
@@ -211,7 +215,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
@@ -234,7 +239,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.2, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.2, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable1.run();
@@ -246,7 +252,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable2.run();
@@ -258,7 +265,8 @@ public class YarnAutoScalingManagerTest {
Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 6.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 6.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable3.run();
@@ -384,7 +392,8 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
runnable.run();
@@ -469,7 +478,8 @@ public class YarnAutoScalingManagerTest {
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService
yarnService, int partitionsPerContainer,
HelixDataAccessor helixDataAccessor) {
super(taskDriver, yarnService, partitionsPerContainer, 1.0,
- noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
+ defaultContainerCores, 20, false);
}
@Override