This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 436cf26cc [GOBBLIN-1919] Rework a few more elements of MR-related job 
exec for reuse in Temporal-based execution (#3879)
436cf26cc is described below

commit 436cf26cc23ae189936495d7c63a06c77395ca62
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Feb 21 13:37:11 2024 -0800

    [GOBBLIN-1919] Rework a few more elements of MR-related job exec for reuse 
in Temporal-based execution (#3879)
    
    * Rework a few more elements of MR-related job exec for reuse in 
Temporal-based execution
    
    * minor adjustments
    
    * changed method name
---
 .../gobblin/runtime/AbstractJobLauncher.java       | 57 ++++++++++++++--------
 .../java/org/apache/gobblin/runtime/JobState.java  | 18 +++++++
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 20 ++------
 .../org/apache/gobblin/util/JobLauncherUtils.java  | 22 +++++++++
 .../org/apache/gobblin/util/ParallelRunner.java    |  5 ++
 5 files changed, 85 insertions(+), 37 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 690dc113b..578696f62 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -147,9 +147,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   // This contains all job context information
   protected final JobContext jobContext;
 
-  // Helper to prepare WorkUnit with necessary information. This final object 
can make sure the uniqueness of task IDs
-  protected final WorkUnitPreparator workUnitPreparator;
-
   // This (optional) JobLock is used to prevent the next scheduled run
   // of the job from starting if the current run has not finished yet
   protected Optional<JobLock> jobLockOptional = Optional.absent();
@@ -230,7 +227,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
       this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker, 
troubleshooter.getIssueRepository());
       this.eventBus.register(this.jobContext);
-      this.workUnitPreparator = new 
WorkUnitPreparator(this.jobContext.getJobId());
 
       this.cancellationExecutor = Executors.newSingleThreadExecutor(
           ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG), 
Optional.of("CancellationExecutor")));
@@ -553,21 +549,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           TimingEvent workUnitsPreparationTimer =
               
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
           workUnitStream = processWorkUnitStream(workUnitStream, jobState);
+
           // If it is a streaming source, workunits cannot be counted
           this.jobContext.getJobState().setProp(NUM_WORKUNITS,
               workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
           
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
 jobState);
-          // dump the work unit if tracking logs are enabled
-          if 
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
-            workUnitStream = workUnitStream.transform(new Function<WorkUnit, 
WorkUnit>() {
-              @Nullable
-              @Override
-              public WorkUnit apply(@Nullable WorkUnit input) {
-                LOG.info("Work unit tracking log: {}", input);
-                return input;
-              }
-            });
-          }
+          // dump the work unit if tracking logs are enabled (*AFTER* any 
materialization done for counting)
+          workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream, 
jobState, LOG);
 
           
workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
               EventName.WORK_UNITS_PREPARATION));
@@ -710,13 +698,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     }
   }
 
-  protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream, 
DestinationDatasetHandlerService datasetHandlerService){
+  protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream, 
DestinationDatasetHandlerService datasetHandlerService) {
     return datasetHandlerService.executeHandlers(workUnitStream);
   }
 
   protected WorkUnitStream processWorkUnitStream(WorkUnitStream 
workUnitStream, JobState jobState) {
     // Add task ids
-    workUnitStream = prepareWorkUnits(workUnitStream);
+    workUnitStream = prepareWorkUnits(workUnitStream, jobState);
     // Remove skipped workUnits from the list of work units to execute.
     workUnitStream = workUnitStream.filter(new 
SkippedWorkUnitsFilter(jobState));
     // Add surviving tasks to jobState
@@ -836,7 +824,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   /**
    * Materialize a {@link WorkUnitStream} into an in-memory list. Note that 
infinite work unit streams cannot be materialized.
    */
-  protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream 
workUnitStream) {
+  public static List<WorkUnit> materializeWorkUnitList(WorkUnitStream 
workUnitStream) {
     if (!workUnitStream.isFiniteStream()) {
       throw new UnsupportedOperationException("Cannot materialize an infinite 
work unit stream.");
     }
@@ -904,11 +892,38 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     });
   }
 
+  /** @return transformed `workUnits` after assigning task IDs, removing 
skipped ones, and registering those remaining with `jobState` */
+  public static WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, 
JobState jobState) {
+    return assignIdsToWorkUnits(workUnits, jobState) // assign task ids
+        .filter(new SkippedWorkUnitsFilter(jobState)) // remove skipped 
workUnits
+        .transform(new MultiWorkUnitForEach() { // add remaining to jobState
+          @Override
+          public void forWorkUnit(WorkUnit workUnit) {
+            jobState.incrementTaskCount();
+            jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, 
jobState)));
+          }
+        });
+  }
+
+  /** @return `workUnitStream` transformed to add "tracking" (sic.: actually 
*trace* logging), if indicated by `jobState` config */
+  public static WorkUnitStream addWorkUnitTrackingPerConfig(WorkUnitStream 
workUnitStream, JobState jobState, Logger log) {
+    return 
!jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)
+        ? workUnitStream // no-op, when not enabled
+        : workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
+          @Nullable
+          @Override
+          public WorkUnit apply(@Nullable WorkUnit input) {
+            log.info("Work unit tracking log: {}", input);
+            return input;
+          }
+        });
+  }
+
   /**
-   * Prepare the flattened {@link WorkUnit}s for execution by populating the 
job and task IDs.
+   * Prepare the flattened {@link WorkUnit}s for execution by populating the 
job and unique task IDs.
    */
-  private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
-    return workUnits.transform(workUnitPreparator);
+  private static WorkUnitStream assignIdsToWorkUnits(WorkUnitStream workUnits, 
JobState jobState) {
+    return workUnits.transform(new WorkUnitPreparator(jobState.getJobId()));
   }
 
   private static abstract class MultiWorkUnitForEach implements 
Function<WorkUnit, WorkUnit> {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 5f8783fcc..c58641e89 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -163,6 +163,13 @@ public class JobState extends SourceState implements 
JobProgress {
     this.setId(jobId);
   }
 
+  public JobState(Properties properties) {
+    this(
+        JobState.getStateFromProps(properties),
+        JobState.getJobNameFromProps(properties),
+        JobState.getJobIdFromProps(properties));
+  }
+
   public JobState(State properties, String jobName, String jobId) {
     super(properties);
     this.jobName = jobName;
@@ -191,6 +198,17 @@ public class JobState extends SourceState implements 
JobProgress {
         : JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
   }
 
+  public static State getStateFromProps(Properties props) {
+    return JobState.getStateFromProps(props, 
JobState.getJobIdFromProps(props));
+  }
+
+  public static State getStateFromProps(Properties props, String 
jobIdPropValue) {
+    State state = new State();
+    state.addAll(props);
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, jobIdPropValue); // in case 
not yet directly defined as such
+    return state;
+  }
+
   public static String getJobGroupFromState(State state) {
     return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1893cebd7..f1c1fa319 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -255,8 +255,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     // adding dependent jars/files to the DistributedCache that also updates 
the conf)
     this.job = Job.getInstance(this.conf, JOB_NAME_PREFIX + 
this.jobContext.getJobName());
 
-    this.parallelRunnerThreads = 
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
-        Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
+    this.parallelRunnerThreads = ParallelRunner.getNumThreadsConfig(jobProps);
 
     // StateStore interface uses the following key (rootDir, storeName, 
tableName)
     // The state store base is the root directory and the last two elements of 
the path are used as the storeName and
@@ -682,23 +681,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
     try {
       ParallelRunner parallelRunner = closer.register(new 
ParallelRunner(this.parallelRunnerThreads, this.fs));
 
-      int multiTaskIdSequence = 0;
+      JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new 
JobLauncherUtils.WorkUnitPathCalculator();
       // Serialize each work unit into a file named after the task ID
       for (WorkUnit workUnit : workUnits) {
-
-        String workUnitFileName;
-        if (workUnit.isMultiWorkUnit()) {
-          workUnitFileName = 
JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), 
multiTaskIdSequence++)
-              + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
-        } else {
-          workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
-        }
-        Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
-        LOG.debug("Writing work unit file " + workUnitFileName);
-
+        Path workUnitFile = pathCalculator.calcNextPath(workUnit, 
this.jobContext.getJobId(), this.jobInputPath);
+        LOG.debug("Writing work unit file {}", workUnitFile.getName());
         parallelRunner.serializeToFile(workUnit, workUnitFile);
-
-        // Append the work unit file path to the job input file
       }
     } catch (Throwable t) {
       throw closer.rethrow(t);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 89bf4d4bd..6439f41b2 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +41,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -61,6 +64,25 @@ public class JobLauncherUtils {
   // A cache for proxied FileSystems by owners
   private static Cache<String, FileSystem> fileSystemCacheByOwners = 
CacheBuilder.newBuilder().build();
 
+  /** Calculate monotonically-increasing paths for multi-WU files */
+  @AllArgsConstructor
+  @NotThreadSafe
+  public static class WorkUnitPathCalculator {
+    private int nextMultiWorkUnitTaskId;
+
+    public WorkUnitPathCalculator() {
+      this(0);
+    }
+
+    // Serialize each work unit into a file named after the task ID
+    public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
+      String workUnitFileName = workUnit.isMultiWorkUnit()
+          ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++) 
+ JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+          : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+      return new Path(basePath, workUnitFileName);
+    }
+  }
+
   /**
    * Create a new job ID.
    *
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index ac3193791..941f66246 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.util;
 
+import java.util.Properties;
 import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
@@ -86,6 +87,10 @@ public class ParallelRunner implements Closeable {
   public static final String PARALLEL_RUNNER_THREADS_KEY = 
"parallel.runner.threads";
   public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
 
+  public static int getNumThreadsConfig(Properties props) {
+    return Integer.parseInt(props.getProperty(PARALLEL_RUNNER_THREADS_KEY, 
Integer.toString(DEFAULT_PARALLEL_RUNNER_THREADS)));
+  }
+
   private final ExecutorService executor;
 
   /**

Reply via email to