This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b8444fa2f Add `MRJobLauncher` option to persist WorkUnits, then cancel 
before running the job (#3764)
b8444fa2f is described below

commit b8444fa2f41c2c13e79bd993d0fd7da471b3fa7c
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Sep 8 10:46:38 2023 -0700

    Add `MRJobLauncher` option to persist WorkUnits, then cancel before running 
the job (#3764)
    
    * Add `MRJobLauncher` option to persist WorkUnits, then cancel before 
running the job
    
    * Lower-case "workunits" in log messaging, for consistency
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 ++
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 33 +++++++++++++++++-----
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00af31751..198c8de35 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -715,11 +715,13 @@ public class ConfigurationKeys {
   public static final String MR_JARS_BASE_DIR = "mr.jars.base.dir";
   public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
   public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY = 
"mr.job.map.failure.is.fatal";
+  public static final String MR_PERSIST_WORK_UNITS_THEN_CANCEL_KEY = 
"mr.persist.workunits.then.cancel";
   public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
   public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY = 
"mr.report.metrics.as.counters";
   public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
   public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
   public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
+  public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = 
false;
   public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index bb89f9c6d..1302cdab2 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -173,6 +173,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private final Path unsharedJarsDir;
   private final Path jobInputPath;
   private final Path jobOutputPath;
+  private final boolean shouldPersistWorkUnitsThenCancel;
 
   private final int parallelRunnerThreads;
 
@@ -248,6 +249,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
     this.jobOutputPath = new Path(this.mrJobDir, OUTPUT_DIR_NAME);
     Path outputTaskStateDir = new Path(this.jobOutputPath, 
this.jobContext.getJobId());
+    this.shouldPersistWorkUnitsThenCancel = 
isPersistWorkUnitsThenCancelEnabled(this.jobProps);
 
     // Finally create the Hadoop job after all updates to conf are already 
made (including
     // adding dependent jars/files to the DistributedCache that also updates 
the conf)
@@ -313,6 +315,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
       LOG.info("Emitting WorkUnitsCreated Count: " + 
countEventBuilder.getCount());
 
       prepareHadoopJob(workUnits);
+      if (this.shouldPersistWorkUnitsThenCancel) {
+        LOG.info("Cancelling job after persisting workunits beneath: " + 
this.jobInputPath);
+        jobState.setState(JobState.RunningState.CANCELLED);
+        return;
+      }
 
       // Start the output TaskState collector service
       this.taskStateCollectorService.startAsync().awaitRunning();
@@ -525,11 +532,19 @@ public class MRJobLauncher extends AbstractJobLauncher {
         && 
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
   }
 
-  static boolean isMapperFailureFatalEnabled(Properties properties) {
-    return (
-        
properties.containsKey(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)
-        && 
Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)))
-        || ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL;
+  static boolean isBooleanPropEnabled(Properties props, String propKey, 
Optional<Boolean> optDefault) {
+    return (props.containsKey(propKey) && 
Boolean.parseBoolean(props.getProperty(propKey)))
+        || (optDefault.isPresent() && optDefault.get());
+  }
+
+  static boolean isMapperFailureFatalEnabled(Properties props) {
+    return isBooleanPropEnabled(props, 
ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY,
+        Optional.of(ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL));
+  }
+
+  static boolean isPersistWorkUnitsThenCancelEnabled(Properties props) {
+    return isBooleanPropEnabled(props, 
ConfigurationKeys.MR_PERSIST_WORK_UNITS_THEN_CANCEL_KEY,
+        
Optional.of(ConfigurationKeys.DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL));
   }
 
   @VisibleForTesting
@@ -698,8 +713,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private void cleanUpWorkingDirectory() {
     try {
       if (this.fs.exists(this.mrJobDir)) {
-        this.fs.delete(this.mrJobDir, true);
-        LOG.info("Deleted working directory " + this.mrJobDir);
+        if (this.shouldPersistWorkUnitsThenCancel) {
+          LOG.info("Preserving persisted workunits beneath: " + 
this.jobInputPath);
+        } else {
+          this.fs.delete(this.mrJobDir, true);
+          LOG.info("Deleted working directory " + this.mrJobDir);
+        }
       }
     } catch (IOException ioe) {
       LOG.error("Failed to delete working directory " + this.mrJobDir);

Reply via email to