This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 436cf26cc [GOBBLIN-1919] Rework a few more elements of MR-related job
exec for reuse in Temporal-based execution (#3879)
436cf26cc is described below
commit 436cf26cc23ae189936495d7c63a06c77395ca62
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Feb 21 13:37:11 2024 -0800
[GOBBLIN-1919] Rework a few more elements of MR-related job exec for reuse
in Temporal-based execution (#3879)
* Rework a few more elements of MR-related job exec for reuse in
Temporal-based execution
* minor adjustments
* changed method name
---
.../gobblin/runtime/AbstractJobLauncher.java | 57 ++++++++++++++--------
.../java/org/apache/gobblin/runtime/JobState.java | 18 +++++++
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 20 ++------
.../org/apache/gobblin/util/JobLauncherUtils.java | 22 +++++++++
.../org/apache/gobblin/util/ParallelRunner.java | 5 ++
5 files changed, 85 insertions(+), 37 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 690dc113b..578696f62 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -147,9 +147,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
// This contains all job context information
protected final JobContext jobContext;
- // Helper to prepare WorkUnit with necessary information. This final object
can make sure the uniqueness of task IDs
- protected final WorkUnitPreparator workUnitPreparator;
-
// This (optional) JobLock is used to prevent the next scheduled run
// of the job from starting if the current run has not finished yet
protected Optional<JobLock> jobLockOptional = Optional.absent();
@@ -230,7 +227,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker,
troubleshooter.getIssueRepository());
this.eventBus.register(this.jobContext);
- this.workUnitPreparator = new
WorkUnitPreparator(this.jobContext.getJobId());
this.cancellationExecutor = Executors.newSingleThreadExecutor(
ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG),
Optional.of("CancellationExecutor")));
@@ -553,21 +549,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
TimingEvent workUnitsPreparationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
workUnitStream = processWorkUnitStream(workUnitStream, jobState);
+
// If it is a streaming source, workunits cannot be counted
this.jobContext.getJobState().setProp(NUM_WORKUNITS,
workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
jobState);
- // dump the work unit if tracking logs are enabled
- if
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
- workUnitStream = workUnitStream.transform(new Function<WorkUnit,
WorkUnit>() {
- @Nullable
- @Override
- public WorkUnit apply(@Nullable WorkUnit input) {
- LOG.info("Work unit tracking log: {}", input);
- return input;
- }
- });
- }
+ // dump the work unit if tracking logs are enabled (*AFTER* any
materialization done for counting)
+ workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream,
jobState, LOG);
workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_PREPARATION));
@@ -710,13 +698,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
- protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream,
DestinationDatasetHandlerService datasetHandlerService){
+ protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream,
DestinationDatasetHandlerService datasetHandlerService) {
return datasetHandlerService.executeHandlers(workUnitStream);
}
protected WorkUnitStream processWorkUnitStream(WorkUnitStream
workUnitStream, JobState jobState) {
// Add task ids
- workUnitStream = prepareWorkUnits(workUnitStream);
+ workUnitStream = prepareWorkUnits(workUnitStream, jobState);
// Remove skipped workUnits from the list of work units to execute.
workUnitStream = workUnitStream.filter(new
SkippedWorkUnitsFilter(jobState));
// Add surviving tasks to jobState
@@ -836,7 +824,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
/**
* Materialize a {@link WorkUnitStream} into an in-memory list. Note that
infinite work unit streams cannot be materialized.
*/
- protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream
workUnitStream) {
+ public static List<WorkUnit> materializeWorkUnitList(WorkUnitStream
workUnitStream) {
if (!workUnitStream.isFiniteStream()) {
throw new UnsupportedOperationException("Cannot materialize an infinite
work unit stream.");
}
@@ -904,11 +892,38 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
});
}
+ /** @return transformed `workUnits` after assigning task IDs, removing
skipped ones, and registering those remaining with `jobState` */
+ public static WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits,
JobState jobState) {
+ return assignIdsToWorkUnits(workUnits, jobState) // assign task ids
+ .filter(new SkippedWorkUnitsFilter(jobState)) // remove skipped
workUnits
+ .transform(new MultiWorkUnitForEach() { // add remaining to jobState
+ @Override
+ public void forWorkUnit(WorkUnit workUnit) {
+ jobState.incrementTaskCount();
+ jobState.addTaskState(new TaskState(new WorkUnitState(workUnit,
jobState)));
+ }
+ });
+ }
+
+ /** @return `workUnitStream` transformed to add "tracking" (sic.: actually
*trace* logging), if indicated by `jobState` config */
+ public static WorkUnitStream addWorkUnitTrackingPerConfig(WorkUnitStream
workUnitStream, JobState jobState, Logger log) {
+ return
!jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)
+ ? workUnitStream // no-op, when not enabled
+ : workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
+ @Nullable
+ @Override
+ public WorkUnit apply(@Nullable WorkUnit input) {
+ log.info("Work unit tracking log: {}", input);
+ return input;
+ }
+ });
+ }
+
/**
- * Prepare the flattened {@link WorkUnit}s for execution by populating the
job and task IDs.
+ * Prepare the flattened {@link WorkUnit}s for execution by populating the
job and unique task IDs.
*/
- private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
- return workUnits.transform(workUnitPreparator);
+ private static WorkUnitStream assignIdsToWorkUnits(WorkUnitStream workUnits,
JobState jobState) {
+ return workUnits.transform(new WorkUnitPreparator(jobState.getJobId()));
}
private static abstract class MultiWorkUnitForEach implements
Function<WorkUnit, WorkUnit> {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 5f8783fcc..c58641e89 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -163,6 +163,13 @@ public class JobState extends SourceState implements
JobProgress {
this.setId(jobId);
}
+ public JobState(Properties properties) {
+ this(
+ JobState.getStateFromProps(properties),
+ JobState.getJobNameFromProps(properties),
+ JobState.getJobIdFromProps(properties));
+ }
+
public JobState(State properties, String jobName, String jobId) {
super(properties);
this.jobName = jobName;
@@ -191,6 +198,17 @@ public class JobState extends SourceState implements
JobProgress {
: JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
}
+ public static State getStateFromProps(Properties props) {
+ return JobState.getStateFromProps(props,
JobState.getJobIdFromProps(props));
+ }
+
+ public static State getStateFromProps(Properties props, String
jobIdPropValue) {
+ State state = new State();
+ state.addAll(props);
+ state.setProp(ConfigurationKeys.JOB_ID_KEY, jobIdPropValue); // in case
not yet directly defined as such
+ return state;
+ }
+
public static String getJobGroupFromState(State state) {
return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1893cebd7..f1c1fa319 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -255,8 +255,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
// adding dependent jars/files to the DistributedCache that also updates
the conf)
this.job = Job.getInstance(this.conf, JOB_NAME_PREFIX +
this.jobContext.getJobName());
- this.parallelRunnerThreads =
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
- Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
+ this.parallelRunnerThreads = ParallelRunner.getNumThreadsConfig(jobProps);
// StateStore interface uses the following key (rootDir, storeName,
tableName)
// The state store base is the root directory and the last two elements of
the path are used as the storeName and
@@ -682,23 +681,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
try {
ParallelRunner parallelRunner = closer.register(new
ParallelRunner(this.parallelRunnerThreads, this.fs));
- int multiTaskIdSequence = 0;
+ JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new
JobLauncherUtils.WorkUnitPathCalculator();
// Serialize each work unit into a file named after the task ID
for (WorkUnit workUnit : workUnits) {
-
- String workUnitFileName;
- if (workUnit.isMultiWorkUnit()) {
- workUnitFileName =
JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(),
multiTaskIdSequence++)
- + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
- } else {
- workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
- }
- Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
- LOG.debug("Writing work unit file " + workUnitFileName);
-
+ Path workUnitFile = pathCalculator.calcNextPath(workUnit,
this.jobContext.getJobId(), this.jobInputPath);
+ LOG.debug("Writing work unit file {}", workUnitFile.getName());
parallelRunner.serializeToFile(workUnit, workUnitFile);
-
- // Append the work unit file path to the job input file
}
} catch (Throwable t) {
throw closer.rethrow(t);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 89bf4d4bd..6439f41b2 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -27,6 +27,8 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.NotThreadSafe;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +41,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -61,6 +64,25 @@ public class JobLauncherUtils {
// A cache for proxied FileSystems by owners
private static Cache<String, FileSystem> fileSystemCacheByOwners =
CacheBuilder.newBuilder().build();
+ /** Calculate monotonically-increasing paths for multi-WU files */
+ @AllArgsConstructor
+ @NotThreadSafe
+ public static class WorkUnitPathCalculator {
+ private int nextMultiWorkUnitTaskId;
+
+ public WorkUnitPathCalculator() {
+ this(0);
+ }
+
+ // Serialize each work unit into a file named after the task ID
+ public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
+ String workUnitFileName = workUnit.isMultiWorkUnit()
+ ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++)
+ JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+ : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+ return new Path(basePath, workUnitFileName);
+ }
+ }
+
/**
* Create a new job ID.
*
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index ac3193791..941f66246 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.util;
+import java.util.Properties;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
@@ -86,6 +87,10 @@ public class ParallelRunner implements Closeable {
public static final String PARALLEL_RUNNER_THREADS_KEY =
"parallel.runner.threads";
public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
+ public static int getNumThreadsConfig(Properties props) {
+ return Integer.parseInt(props.getProperty(PARALLEL_RUNNER_THREADS_KEY,
Integer.toString(DEFAULT_PARALLEL_RUNNER_THREADS)));
+ }
+
private final ExecutorService executor;
/**