This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b8444fa2f Add `MRJobLauncher` option to persist WorkUnits, then cancel
before running the job (#3764)
b8444fa2f is described below
commit b8444fa2f41c2c13e79bd993d0fd7da471b3fa7c
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Sep 8 10:46:38 2023 -0700
Add `MRJobLauncher` option to persist WorkUnits, then cancel before running
the job (#3764)
* Add `MRJobLauncher` option to persist WorkUnits, then cancel before
running the job
* Lower-case "workunits" in log messaging, for consistency
---
.../gobblin/configuration/ConfigurationKeys.java | 2 ++
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 33 +++++++++++++++++-----
2 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00af31751..198c8de35 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -715,11 +715,13 @@ public class ConfigurationKeys {
public static final String MR_JARS_BASE_DIR = "mr.jars.base.dir";
public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY =
"mr.job.map.failure.is.fatal";
+ public static final String MR_PERSIST_WORK_UNITS_THEN_CANCEL_KEY =
"mr.persist.workunits.then.cancel";
public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY =
"mr.report.metrics.as.counters";
public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
+ public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL =
false;
public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index bb89f9c6d..1302cdab2 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -173,6 +173,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
private final Path unsharedJarsDir;
private final Path jobInputPath;
private final Path jobOutputPath;
+ private final boolean shouldPersistWorkUnitsThenCancel;
private final int parallelRunnerThreads;
@@ -248,6 +249,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
this.jobOutputPath = new Path(this.mrJobDir, OUTPUT_DIR_NAME);
Path outputTaskStateDir = new Path(this.jobOutputPath,
this.jobContext.getJobId());
+ this.shouldPersistWorkUnitsThenCancel =
isPersistWorkUnitsThenCancelEnabled(this.jobProps);
// Finally create the Hadoop job after all updates to conf are already
made (including
// adding dependent jars/files to the DistributedCache that also updates
the conf)
@@ -313,6 +315,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
LOG.info("Emitting WorkUnitsCreated Count: " +
countEventBuilder.getCount());
prepareHadoopJob(workUnits);
+ if (this.shouldPersistWorkUnitsThenCancel) {
+ LOG.info("Cancelling job after persisting workunits beneath: " +
this.jobInputPath);
+ jobState.setState(JobState.RunningState.CANCELLED);
+ return;
+ }
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();
@@ -525,11 +532,19 @@ public class MRJobLauncher extends AbstractJobLauncher {
&&
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
}
- static boolean isMapperFailureFatalEnabled(Properties properties) {
- return (
-
properties.containsKey(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)
- &&
Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)))
- || ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL;
+ static boolean isBooleanPropEnabled(Properties props, String propKey,
Optional<Boolean> optDefault) {
+ return (props.containsKey(propKey) &&
Boolean.parseBoolean(props.getProperty(propKey)))
+ || (optDefault.isPresent() && optDefault.get());
+ }
+
+ static boolean isMapperFailureFatalEnabled(Properties props) {
+ return isBooleanPropEnabled(props,
ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY,
+ Optional.of(ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL));
+ }
+
+ static boolean isPersistWorkUnitsThenCancelEnabled(Properties props) {
+ return isBooleanPropEnabled(props,
ConfigurationKeys.MR_PERSIST_WORK_UNITS_THEN_CANCEL_KEY,
+
Optional.of(ConfigurationKeys.DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL));
}
@VisibleForTesting
@@ -698,8 +713,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
private void cleanUpWorkingDirectory() {
try {
if (this.fs.exists(this.mrJobDir)) {
- this.fs.delete(this.mrJobDir, true);
- LOG.info("Deleted working directory " + this.mrJobDir);
+ if (this.shouldPersistWorkUnitsThenCancel) {
+ LOG.info("Preserving persisted workunits beneath: " +
this.jobInputPath);
+ } else {
+ this.fs.delete(this.mrJobDir, true);
+ LOG.info("Deleted working directory " + this.mrJobDir);
+ }
}
} catch (IOException ioe) {
LOG.error("Failed to delete working directory " + this.mrJobDir);