This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a893585a attempt partial commit only in case commit policy is 
partial commit (#4157)
a5a893585a is described below

commit a5a893585a88dbe3a12f654f1dd111ddb09d82d0
Author: Vivek Rai <[email protected]>
AuthorDate: Tue Dec 2 08:59:12 2025 +0530

    attempt partial commit only in case commit policy is partial commit (#4157)
---
 .../apache/gobblin/source/extractor/JobCommitPolicy.java   | 14 ++++++++++++++
 .../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java    | 11 ++++++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
index 24a5323d27..0506d86825 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.source.extractor;
 
 import java.util.Properties;
 
+import com.typesafe.config.Config;
 import lombok.Getter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -107,4 +109,16 @@ public enum JobCommitPolicy {
   public static JobCommitPolicy getCommitPolicy(State state) {
     return forName(state.getProp(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, 
ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
   }
+
+  /**
+   * Get a {@link JobCommitPolicy} through its name specified in configuration 
property
+   * {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY}.
+   *
+   * @param config a {@link Config} instance carrying job configuration 
properties
+   * @return a {@link JobCommitPolicy} with the given name specified in {@link 
ConfigurationKeys#JOB_COMMIT_POLICY_KEY}
+   */
+  public static JobCommitPolicy getCommitPolicy(Config config) {
+    return forName(ConfigUtils.getString(config, 
ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
+        ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 60aceee0e6..30db5a79b2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -30,6 +30,7 @@ import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -79,7 +80,10 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
       log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
 
       try {
-        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed, props);// Attempt partial commit before surfacing the 
failure
+        if (shouldAttemptPartialCommit()) {
+          performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed,
+              props);// Attempt partial commit before surfacing the failure
+        }
       } catch (Exception commitException) {
         // Combine current and commit exception messages for a more complete 
context
         String combinedMessage = String.format(
@@ -98,6 +102,11 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed, props);
   }
 
+  private boolean shouldAttemptPartialCommit() {
+    return 
JobCommitPolicy.getCommitPolicy(WorkerConfig.of(this).orElse(ConfigFactory.load()))
+        .isAllowPartialCommit();
+  }
+
   private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec 
workSpec,
       Map<String, Object> searchAttributes, Optional<Integer> 
workunitsProcessed, Properties props) {
     //  we are only inhibiting commit when workunitsProcessed is actually 
known to be zero

Reply via email to