This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fbd4ee8e8 [GOBBLIN-2020] Fix number of failure scenarios and bugs in
temporal workflows (#3900)
fbd4ee8e8 is described below
commit fbd4ee8e8b7a57d5f44f6cf0e887d0eb029b94aa
Author: William Lo <[email protected]>
AuthorDate: Fri Mar 22 14:01:33 2024 -0400
[GOBBLIN-2020] Fix number of failure scenarios and bugs in temporal
workflows (#3900)
Fix number of failure scenarios and bugs in temporal workflows
---
.../ddm/activity/impl/CommitActivityImpl.java | 22 +++++++-
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 2 +-
.../gobblin/temporal/ddm/work/assistance/Help.java | 6 +--
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 63 +++++++++++++---------
4 files changed, 61 insertions(+), 32 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 8874bccd7..07c81227d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -25,10 +25,12 @@ import com.google.common.collect.Maps;
import io.temporal.failure.ApplicationFailure;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
@@ -60,13 +62,17 @@ public class CommitActivityImpl implements CommitActivity {
static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
@Override
public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ String jobName = UNDEFINED_JOB_NAME;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
+ jobName = jobState.getJobName();
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
// TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
@@ -86,7 +92,7 @@ public class CommitActivityImpl implements CommitActivity {
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
- "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ String.format("Failed to commit dataset state for some dataset(s) of
job %s", jobName),
IOException.class.toString(),
new IOException(e),
null
@@ -135,9 +141,21 @@ public class CommitActivityImpl implements CommitActivity {
IteratorExecutor.logFailures(result, null, 10);
+ Set<String> failedDatasetUrns = new HashSet<>();
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ // Set the overall job state to FAILED if the job failed to process
any dataset
+ if (datasetState.getState() == JobState.RunningState.FAILED) {
+ failedDatasetUrns.add(datasetState.getDatasetUrn());
+ }
+ }
+ if (!failedDatasetUrns.isEmpty()) {
+ String allFailedDatasets = String.join(", ", failedDatasetUrns);
+ log.error("Failed to commit dataset state for dataset(s) {}" +
allFailedDatasets);
+ throw new IOException("Failed to commit dataset state for " +
allFailedDatasets);
+ }
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 2cb95a5d8..c3de4146b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -75,7 +75,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);
- return workUnits.size();
+ return jobState.getTaskCount();
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
log.error(errMsg, roe);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 09da984a6..23121bb0f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -77,9 +77,9 @@ public class Help {
}
/** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
- public static String qualifyNamePerExecWithFlowExecId(String name, Config
workerConfig) {
- Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(workerConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
- return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
workerConfig);
+ public static String qualifyNamePerExecWithFlowExecId(String name, Config
jobProps) {
+ Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+ return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
jobProps);
}
public static String calcPerExecQualifierWithOptFlowExecId(Optional<String>
optFlowExecId, Config workerConfig) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 39f401a05..c66ff47c1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -26,6 +26,7 @@ import com.typesafe.config.ConfigFactory;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
@@ -33,8 +34,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ }
}
+ timer.stop();
+ } catch (Exception e) {
+ // Emit a failed GobblinTrackingEvent to record job failures
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+ e.getClass().toString(),
+ e,
+ null
+ );
}
- timer.stop();
return numWUsGenerated;
-
}
- protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow() {
+ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties
jobProps) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
.build();
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}