This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a70f53913 Add semantics for failure on partial success (#3831)
a70f53913 is described below

commit a70f53913bb587da3471f946b1518c5c34b7f099
Author: William Lo <[email protected]>
AuthorDate: Thu Nov 16 19:06:25 2023 -0500

    Add semantics for failure on partial success (#3831)
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 ++
 .../org/apache/gobblin/runtime/JobContext.java     |  6 ++++
 .../apache/gobblin/runtime/SafeDatasetCommit.java  |  2 +-
 .../gobblin/runtime/JobLauncherTestHelper.java     | 34 ++++++++++++++++++++++
 .../gobblin/runtime/LocalJobLauncherTest.java      | 13 +++++++++
 5 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f838fc606..21f8bc532 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -205,6 +205,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_FORK_OPERATOR_CLASS = 
"org.apache.gobblin.fork.IdentityForkOperator";
   public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
   public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
+
+  public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = 
"job.commit.partial.fail.task.fails.job.commit";
   // If true, commit of different datasets will be performed in parallel
   // only turn on if publisher is thread-safe
   public static final String PARALLELIZE_DATASET_COMMIT = 
"job.commit.parallelize";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 658b308b7..89d1dc41b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -95,6 +95,11 @@ public class JobContext implements Closeable {
   private final JobState jobState;
   @Getter(AccessLevel.PACKAGE)
   private final JobCommitPolicy jobCommitPolicy;
+  // A job commit semantic where we want partially successful tasks to commit 
their data, but still fail the job
+  // WARNING: this is for Gobblin jobs that do not record their watermark, 
hence this would not lead to duplicate work
+  @Getter(AccessLevel.PACKAGE)
+  private final boolean partialFailTaskFailsJobCommit;
+
   private final Optional<JobMetrics> jobMetricsOptional;
   private final Source<?, ?> source;
 
@@ -146,6 +151,7 @@ public class JobContext implements Closeable {
     this.jobBroker = instanceBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobName, this.jobId))
         
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
     this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy(jobProps);
+    this.partialFailTaskFailsJobCommit = 
Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT,
 "false"));
 
     this.datasetStateStore = 
createStateStore(ConfigUtils.propertiesToConfig(jobProps));
     this.jobHistoryStoreOptional = createJobHistoryStore(jobProps);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 55c9ebd76..a2885c168 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -389,7 +389,7 @@ final class SafeDatasetCommit implements Callable<Void> {
       // Backoff the actual high watermark to the low watermark for each task 
that has not been committed
       if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) 
{
         taskState.backoffActualHighWatermark();
-        if (this.jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+        if (this.jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS || 
this.jobContext.isPartialFailTaskFailsJobCommit()) {
           // Determine the final dataset state based on the task states (post 
commit) and the job commit policy.
           // 1. If COMMIT_ON_FULL_SUCCESS is used, the processing of the 
dataset is considered failed if any
           //    task for the dataset failed to be committed.
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 00f16edb8..d63f98710 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -363,6 +363,40 @@ public class JobLauncherTestHelper {
     }
   }
 
+  public void runTestWithCommitSuccessfulTasksPolicyAndFailJob(Properties 
jobProps) throws Exception {
+    String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    String jobId = JobLauncherUtils.newJobId(jobName).toString();
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+    jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, 
Boolean.FALSE.toString());
+    jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, 
"successful");
+    jobProps.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
TestSourceWithFaultyExtractor.class.getName());
+    jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+    jobProps.setProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT, 
"true");
+
+    Closer closer = Closer.create();
+    try {
+      JobLauncher jobLauncher = 
closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, 
jobProps));
+      jobLauncher.launchJob(null);
+    } catch (JobException e) {
+      List<JobState.DatasetState> datasetStateList = 
this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + 
".jst");
+      JobState jobState = datasetStateList.get(0);
+
+      Assert.assertEquals(jobState.getState(), JobState.RunningState.FAILED);
+      Assert.assertEquals(jobState.getCompletedTasks(), 4);
+      for (TaskState taskState : jobState.getTaskStates()) {
+        if (taskState.getTaskId().endsWith("0")) {
+          Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.FAILED);
+        } else {
+          Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.COMMITTED);
+          
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
+              TestExtractor.TOTAL_RECORDS);
+        }
+      }
+    } finally {
+      closer.close();
+    }
+  }
+
   public void runTestWithMultipleDatasetsAndFaultyExtractor(Properties 
jobProps, boolean usePartialCommitPolicy)
       throws Exception {
     String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index a8916045a..91663e375 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -270,6 +270,19 @@ public class LocalJobLauncherTest {
     }
   }
 
+  @Test
+  public void testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob() throws 
Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
"-testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob");
+    try {
+      
this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicyAndFailJob(jobProps);
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
+
   @Test
   public void testLaunchJobWithMultipleDatasetsAndFaultyExtractor() throws 
Exception {
     Properties jobProps = loadJobProps();

Reply via email to