This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf770c2cd [GOBBLIN-2175] Support partial commit with
Gobblin-on-Temporal execution (#4078)
0bf770c2cd is described below
commit 0bf770c2cde24c98c30ee6c9af18f58a9589f585
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Dec 13 12:13:14 2024 +0530
[GOBBLIN-2175] Support partial commit with Gobblin-on-Temporal execution
(#4078)
---------
Co-authored-by: Aditya Pratap Singh <[email protected]>
---
.../gobblin/source/extractor/JobCommitPolicy.java | 12 +++--
.../ddm/activity/impl/CommitActivityImpl.java | 22 ++++++--
.../gobblin/temporal/ddm/work/CommitStats.java | 6 ++-
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 19 +++++--
.../impl/ProcessWorkUnitsWorkflowImpl.java | 49 ++++++++++++++----
.../exception/FailedDatasetUrnsException.java | 60 ++++++++++++++++++++++
6 files changed, 143 insertions(+), 25 deletions(-)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
index f60acb6030..24a5323d27 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.source.extractor;
import java.util.Properties;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -33,7 +35,7 @@ public enum JobCommitPolicy {
/**
* Commit output data of a job if and only if all of its tasks successfully
complete.
*/
- COMMIT_ON_FULL_SUCCESS("full"),
+ COMMIT_ON_FULL_SUCCESS("full", false),
/**
* Commit a job even if some of its tasks fail. It's up to the {@link
org.apache.gobblin.publisher.DataPublisher} to
@@ -43,7 +45,7 @@ public enum JobCommitPolicy {
* and should cover most use cases when {@link
#COMMIT_ON_FULL_SUCCESS} is not appropriate.
*/
@Deprecated
- COMMIT_ON_PARTIAL_SUCCESS("partial"),
+ COMMIT_ON_PARTIAL_SUCCESS("partial", true),
/**
* Commit output data of tasks that successfully complete.
@@ -51,12 +53,14 @@ public enum JobCommitPolicy {
* It is recommended to use this commit policy in conjunction with
task-level data publishing (i.e., when
* {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to {@code
false}).
*/
- COMMIT_SUCCESSFUL_TASKS("successful");
+ COMMIT_SUCCESSFUL_TASKS("successful", true);
private final String name;
+ @Getter private final boolean allowPartialCommit;// Indicates if the commit
policy allows partial task commits
- JobCommitPolicy(String name) {
+ JobCommitPolicy(String name, boolean allowPartialCommit) {
this.name = name;
+ this.allowPartialCommit = allowPartialCommit;
}
/**
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 97699f2ce2..ae85a6a083 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -63,11 +63,13 @@ import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
+
@Slf4j
public class CommitActivityImpl implements CommitActivity {
@@ -97,12 +99,22 @@ public class CommitActivityImpl implements CommitActivity {
Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
- commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
+ try {
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ } catch (FailedDatasetUrnsException exception) {
+ log.warn("Some datasets failed to be committed, proceeding with
publishing commit step", exception);
+ optFailure = Optional.of(exception);
+ }
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
- return new CommitStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+ return new CommitStats(
+ datasetTaskSummaries,
+
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
+ optFailure
+ );
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -164,8 +176,8 @@ public class CommitActivityImpl implements CommitActivity {
if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
- log.error("Failed to commit dataset state for dataset(s) {}" +
allFailedDatasets);
- throw new IOException("Failed to commit dataset state for " +
allFailedDatasets);
+ log.error("Failed to commit dataset state for dataset(s) {}",
allFailedDatasets);
+ throw new FailedDatasetUrnsException(failedDatasetUrns);
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
@@ -205,7 +217,7 @@ public class CommitActivityImpl implements CommitActivity {
// Only process successful datasets unless configuration to process failed
datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
- && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ && commitPolicy.isAllowPartialCommit())) {
long totalBytesWritten = 0;
long totalRecordsWritten = 0;
int totalCommittedTasks = 0;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index f929831133..6d1416f514 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -19,12 +19,15 @@ package org.apache.gobblin.temporal.ddm.work;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+
/**
* Data structure representing the stats for a committed dataset, and the
total number of committed workunits in the Gobblin Temporal job
@@ -37,8 +40,9 @@ import lombok.RequiredArgsConstructor;
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
+ @NonNull private Optional<FailedDatasetUrnsException> optFailure;
public static CommitStats createEmpty() {
- return new CommitStats(new HashMap<>(), 0);
+ return new CommitStats(new HashMap<>(), 0, Optional.empty());
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index d568ea4e5b..c2c21d2825 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;
import java.time.Duration;
@@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
- .submit();
+
+ if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+ }
+ if (commitGobblinStats.getOptFailure().isPresent()) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to commit dataset state for some dataset(s)"),
commitGobblinStats.getOptFailure().get().getClass().toString(),
+ commitGobblinStats.getOptFailure().get());
+ }
return commitGobblinStats;
}
-
private List<DatasetTaskSummary>
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 6402e473bf..97c1c0a767 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
@@ -72,20 +73,48 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
+ Optional.empty()));
+ } catch (Exception e) {
+ log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
+
+ try {
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);// Attempt partial commit before surfacing the failure
+ } catch (Exception commitException) {
+ // Combine current and commit exception messages for a more complete
context
+ String combinedMessage = String.format(
+ "Processing failure: %s. Commit workflow failure: %s",
+ e.getMessage(),
+ commitException.getMessage()
+ );
+ log.error(combinedMessage);
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Processing failure: %s. Partial commit failure:
%s", combinedMessage, commitException),
+ Exception.class.toString(),
+ new Exception(e)); // Wrap the original exception for stack trace
preservation
}
- return result;
- } else {
+ throw e;// Re-throw after any partial commit, to fail the parent
workflow in case commitWorkflow didn't flow (unlikely)
+ }
+ return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);
+ }
+
+ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec
workSpec,
+ Map<String, Object> searchAttributes, Optional<Integer>
workunitsProcessed) {
+ // we are only inhibiting commit when workunitsProcessed is actually
known to be zero
+ if (workunitsProcessed.filter(n -> n == 0).isPresent()) {
log.error("No work units processed, so no commit attempted.");
return CommitStats.createEmpty();
}
+ CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
+ CommitStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
+ log.warn("No work units committed at the job level. They could have been
committed at the task level.");
+ }
+ return result;
}
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec
workSpec) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
new file mode 100644
index 0000000000..ea8e1def41
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.exception;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Getter;
+
+
+/**
+ * An exception thrown when a set of dataset URNs fail to be processed.
+ */
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailedDatasetUrnsException extends IOException {
+
+ @Getter
+ private final Set<String> failedDatasetUrns;
+
+ /**
+ * Creates a new instance of this exception with the failed dataset URNs.
+ *
+ * @param failedDatasetUrns a set containing the URNs of the datasets that
failed to process
+ */
+ public FailedDatasetUrnsException(Set<String> failedDatasetUrns) {
+ super("Failed to process the following dataset URNs: " + String.join(",",
failedDatasetUrns));
+ this.failedDatasetUrns = failedDatasetUrns;
+ }
+
+ /**
+ * Default constructor for {@code FailedDatasetUrnsException}.
+ * <p>
+ * This constructor initializes an empty {@link HashSet} for {@code
failedDatasetUrns}.
+ * It is provided to support frameworks like Jackson that require a
no-argument constructor
+ * for deserialization purposes.
+ * </p>
+ * */
+ public FailedDatasetUrnsException() {
+ super();
+ this.failedDatasetUrns = new HashSet<>();
+ }
+}