This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a5a893585a attempt partial commit only in case commit policy is
partial commit (#4157)
a5a893585a is described below
commit a5a893585a88dbe3a12f654f1dd111ddb09d82d0
Author: Vivek Rai <[email protected]>
AuthorDate: Tue Dec 2 08:59:12 2025 +0530
attempt partial commit only in case commit policy is partial commit (#4157)
---
.../apache/gobblin/source/extractor/JobCommitPolicy.java | 14 ++++++++++++++
.../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java | 11 ++++++++++-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
index 24a5323d27..0506d86825 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.source.extractor;
import java.util.Properties;
+import com.typesafe.config.Config;
import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -107,4 +109,16 @@ public enum JobCommitPolicy {
public static JobCommitPolicy getCommitPolicy(State state) {
return forName(state.getProp(ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
}
+
+ /**
+ * Get a {@link JobCommitPolicy} through its name specified in configuration
property
+ * {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY}.
+ *
+ * @param config a {@link Config} instance carrying job configuration
properties
+ * @return a {@link JobCommitPolicy} with the given name specified in {@link
ConfigurationKeys#JOB_COMMIT_POLICY_KEY}
+ */
+ public static JobCommitPolicy getCommitPolicy(Config config) {
+ return forName(ConfigUtils.getString(config,
ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
+ ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 60aceee0e6..30db5a79b2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -30,6 +30,7 @@ import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -79,7 +80,10 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
try {
- performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed, props);// Attempt partial commit before surfacing the
failure
+ if (shouldAttemptPartialCommit()) {
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed,
+ props);// Attempt partial commit before surfacing the failure
+ }
} catch (Exception commitException) {
// Combine current and commit exception messages for a more complete
context
String combinedMessage = String.format(
@@ -98,6 +102,11 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed, props);
}
+ private boolean shouldAttemptPartialCommit() {
+ return
JobCommitPolicy.getCommitPolicy(WorkerConfig.of(this).orElse(ConfigFactory.load()))
+ .isAllowPartialCommit();
+ }
+
private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec
workSpec,
Map<String, Object> searchAttributes, Optional<Integer>
workunitsProcessed, Properties props) {
// we are only inhibiting commit when workunitsProcessed is actually
known to be zero