This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a70f53913 Add semantics for failure on partial success (#3831)
a70f53913 is described below
commit a70f53913bb587da3471f946b1518c5c34b7f099
Author: William Lo <[email protected]>
AuthorDate: Thu Nov 16 19:06:25 2023 -0500
Add semantics for failure on partial success (#3831)
---
.../gobblin/configuration/ConfigurationKeys.java | 2 ++
.../org/apache/gobblin/runtime/JobContext.java | 6 ++++
.../apache/gobblin/runtime/SafeDatasetCommit.java | 2 +-
.../gobblin/runtime/JobLauncherTestHelper.java | 34 ++++++++++++++++++++++
.../gobblin/runtime/LocalJobLauncherTest.java | 13 +++++++++
5 files changed, 56 insertions(+), 1 deletion(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f838fc606..21f8bc532 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -205,6 +205,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_OPERATOR_CLASS =
"org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
+
+ public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT =
"job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
// only turn on if publisher is thread-safe
public static final String PARALLELIZE_DATASET_COMMIT =
"job.commit.parallelize";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 658b308b7..89d1dc41b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -95,6 +95,11 @@ public class JobContext implements Closeable {
private final JobState jobState;
@Getter(AccessLevel.PACKAGE)
private final JobCommitPolicy jobCommitPolicy;
+ // A job commit semantic where we want partially successful tasks to commit
their data, but still fail the job
+ // WARNING: this is for Gobblin jobs that do not record their watermark,
hence this would not lead to duplicate work
+ @Getter(AccessLevel.PACKAGE)
+ private final boolean partialFailTaskFailsJobCommit;
+
private final Optional<JobMetrics> jobMetricsOptional;
private final Source<?, ?> source;
@@ -146,6 +151,7 @@ public class JobContext implements Closeable {
this.jobBroker = instanceBroker.newSubscopedBuilder(new
JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy(jobProps);
+ this.partialFailTaskFailsJobCommit =
Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT,
"false"));
this.datasetStateStore =
createStateStore(ConfigUtils.propertiesToConfig(jobProps));
this.jobHistoryStoreOptional = createJobHistoryStore(jobProps);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 55c9ebd76..a2885c168 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -389,7 +389,7 @@ final class SafeDatasetCommit implements Callable<Void> {
// Backoff the actual high watermark to the low watermark for each task
that has not been committed
if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED)
{
taskState.backoffActualHighWatermark();
- if (this.jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+ if (this.jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS ||
this.jobContext.isPartialFailTaskFailsJobCommit()) {
// Determine the final dataset state based on the task states (post
commit) and the job commit policy.
// 1. If COMMIT_ON_FULL_SUCCESS is used, the processing of the
dataset is considered failed if any
// task for the dataset failed to be committed.
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 00f16edb8..d63f98710 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -363,6 +363,40 @@ public class JobLauncherTestHelper {
}
}
+ public void runTestWithCommitSuccessfulTasksPolicyAndFailJob(Properties
jobProps) throws Exception {
+ String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+ String jobId = JobLauncherUtils.newJobId(jobName).toString();
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+ jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
Boolean.FALSE.toString());
+ jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
"successful");
+ jobProps.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
TestSourceWithFaultyExtractor.class.getName());
+ jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+ jobProps.setProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT,
"true");
+
+ Closer closer = Closer.create();
+ try {
+ JobLauncher jobLauncher =
closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps,
jobProps));
+ jobLauncher.launchJob(null);
+ } catch (JobException e) {
+ List<JobState.DatasetState> datasetStateList =
this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) +
".jst");
+ JobState jobState = datasetStateList.get(0);
+
+ Assert.assertEquals(jobState.getState(), JobState.RunningState.FAILED);
+ Assert.assertEquals(jobState.getCompletedTasks(), 4);
+ for (TaskState taskState : jobState.getTaskStates()) {
+ if (taskState.getTaskId().endsWith("0")) {
+ Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.FAILED);
+ } else {
+ Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.COMMITTED);
+
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
+ TestExtractor.TOTAL_RECORDS);
+ }
+ }
+ } finally {
+ closer.close();
+ }
+ }
+
public void runTestWithMultipleDatasetsAndFaultyExtractor(Properties
jobProps, boolean usePartialCommitPolicy)
throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index a8916045a..91663e375 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -270,6 +270,19 @@ public class LocalJobLauncherTest {
}
}
+ @Test
+ public void testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob() throws
Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
"-testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob");
+ try {
+
this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicyAndFailJob(jobProps);
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
+
@Test
public void testLaunchJobWithMultipleDatasetsAndFaultyExtractor() throws
Exception {
Properties jobProps = loadJobProps();