This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fb85f07e5 [GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal
(#3908)
fb85f07e5 is described below
commit fb85f07e544dae8cd51edff1387136a55df28aa5
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Apr 1 15:19:57 2024 -0700
[GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal (#3908)
* [GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal
---
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 42 +++++++++++++++-------
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 5 ++-
...EagerFsDirBackedWorkUnitClaimCheckWorkload.java | 12 +++++--
.../temporal/ddm/work/WorkUnitClaimCheck.java | 11 ++++--
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 5 ++-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 6 +++-
6 files changed, 56 insertions(+), 25 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 1aebf4318..cebe88094 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -24,12 +24,12 @@ import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -40,6 +40,7 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.AbstractTaskStateTracker;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
@@ -48,12 +49,14 @@ import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
-import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
+import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
+import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
-import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -66,13 +69,30 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit
{
@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
+ AutomaticTroubleshooter troubleshooter = null;
+ EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create();
+ String correlator = String.format("(M)WU [%s]", wu.getCorrelator());
try (FileSystem fs = Help.loadFileSystemForce(wu)) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
- log.info("(M)WU [{}] - loaded; found {} workUnits", wu.getCorrelator(),
workUnits.size());
+ log.info("{} - loaded; found {} workUnits", correlator,
workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
- return execute(workUnits, wu, jobState, fs);
+ troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
+ troubleshooter.start();
+ return execute(workUnits, wu, jobState, fs,
troubleshooter.getIssueRepository());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ try {
+ if (troubleshooter == null) {
+ log.warn("{} - No troubleshooter to report issues from automatic
troubleshooter", correlator);
+ } else {
+ troubleshooter.refineIssues();
+ troubleshooter.logIssueSummary();
+ troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
+ }
+ } catch (Exception e) {
+ log.error(String.format("%s - Failed to report issues from automatic
troubleshooter", correlator), e);
+ }
}
}
@@ -87,7 +107,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
* NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
*/
- protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs) throws IOException, InterruptedException {
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws
IOException, InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
@@ -96,10 +116,6 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit
{
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
SharedResourcesBroker<GobblinScopeTypes> resourcesBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
- AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
- //
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
- troubleshooter.start();
-
List<String> fileSourcePaths = workUnits.stream()
.map(workUnit -> getCopyableFileSourcePathDesc(workUnit,
wu.getWorkUnitPath()))
.collect(Collectors.toList());
@@ -114,7 +130,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit
{
GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
jobState.getJobId(), containerId, jobState, workUnits,
taskStateTracker, taskExecutor, taskStateStore,
multiTaskAttemptCommitPolicy,
- resourcesBroker, troubleshooter.getIssueRepository(),
createInterruptionPredicate(fs, jobState));
+ resourcesBroker, issueRepository, createInterruptionPredicate(fs,
jobState));
return taskAttempt.getNumTasksCreated();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 2480accab..33c6d5f33 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -21,16 +21,15 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
-
import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index d2c193cb1..ac75971d2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -19,9 +19,13 @@ package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
import java.util.Comparator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
+
import org.apache.hadoop.fs.FileStatus;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
/**
* {@link AbstractEagerFsDirBackedWorkload} for {@link WorkUnitClaimCheck}
`WORK_ITEM`s, which uses {@link WorkUnitClaimCheck#getWorkUnitPath()}
@@ -30,15 +34,17 @@ import org.apache.hadoop.fs.FileStatus;
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.ToString(callSuper = true)
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
+ private EventSubmitterContext eventSubmitterContext;
- public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String
hdfsDir) {
+ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String
hdfsDir, EventSubmitterContext eventSubmitterContext) {
super(fileSystemUri, hdfsDir);
+ this.eventSubmitterContext = eventSubmitterContext;
}
@Override
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
// begin by setting all correlators to empty
- return new WorkUnitClaimCheck("", this.getFileSystemUri(),
fileStatus.getPath().toString());
+ return new WorkUnitClaimCheck("", this.getFileSystemUri(),
fileStatus.getPath().toString(), this.eventSubmitterContext);
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index f03215664..5cb2064b1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -18,16 +18,22 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
/**
* Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by
claim-check, where the `workUnitPath` is resolved
@@ -41,6 +47,7 @@ public class WorkUnitClaimCheck implements FileSystemApt,
FileSystemJobStateful
@NonNull private String correlator;
@NonNull private URI fileSystemUri;
@NonNull private String workUnitPath;
+ @NonNull private EventSubmitterContext eventSubmitterContext;
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index c66ff47c1..f855b19a9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -21,6 +21,8 @@ import java.net.URI;
import java.time.Duration;
import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+
import com.typesafe.config.ConfigFactory;
import io.temporal.activity.ActivityOptions;
@@ -29,11 +31,8 @@ import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
-
import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 411a4b6a0..844e55731 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -86,7 +86,11 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec) {
- return new
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(),
workSpec.getWorkUnitsDir());
+ return new EagerFsDirBackedWorkUnitClaimCheckWorkload(
+ workSpec.getFileSystemUri(),
+ workSpec.getWorkUnitsDir(),
+ workSpec.getEventSubmitterContext()
+ );
}
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f) {