This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fb85f07e5 [GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal 
(#3908)
fb85f07e5 is described below

commit fb85f07e544dae8cd51edff1387136a55df28aa5
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Apr 1 15:19:57 2024 -0700

    [GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal (#3908)
    
    * [GOBBLIN-2030] Integrate AutoTroubleshooter with Temporal
---
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     | 42 +++++++++++++++-------
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |  5 ++-
 ...EagerFsDirBackedWorkUnitClaimCheckWorkload.java | 12 +++++--
 .../temporal/ddm/work/WorkUnitClaimCheck.java      | 11 ++++--
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  5 ++-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  6 +++-
 6 files changed, 56 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 1aebf4318..cebe88094 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -24,12 +24,12 @@ import java.util.Properties;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -40,6 +40,7 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.AbstractTaskStateTracker;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobState;
@@ -48,12 +49,14 @@ import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
-import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
+import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
+import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
-import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 
 
@@ -66,13 +69,30 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit 
{
 
   @Override
   public int processWorkUnit(WorkUnitClaimCheck wu) {
+    AutomaticTroubleshooter troubleshooter = null;
+    EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create();
+    String correlator = String.format("(M)WU [%s]", wu.getCorrelator());
     try (FileSystem fs = Help.loadFileSystemForce(wu)) {
       List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
-      log.info("(M)WU [{}] - loaded; found {} workUnits", wu.getCorrelator(), 
workUnits.size());
+      log.info("{} - loaded; found {} workUnits", correlator, 
workUnits.size());
       JobState jobState = Help.loadJobState(wu, fs);
-      return execute(workUnits, wu, jobState, fs);
+      troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
+      troubleshooter.start();
+      return execute(workUnits, wu, jobState, fs, 
troubleshooter.getIssueRepository());
     } catch (IOException | InterruptedException e) {
       throw new RuntimeException(e);
+    } finally {
+      try {
+        if (troubleshooter == null) {
+          log.warn("{} - No troubleshooter to report issues from automatic 
troubleshooter", correlator);
+        } else {
+          troubleshooter.refineIssues();
+          troubleshooter.logIssueSummary();
+          troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
+        }
+      } catch (Exception e) {
+        log.error(String.format("%s - Failed to report issues from automatic 
troubleshooter", correlator), e);
+      }
     }
   }
 
@@ -87,7 +107,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
    * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
    * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
    */
-  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs) throws IOException, InterruptedException {
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws 
IOException, InterruptedException {
     String containerId = "container-id-for-wu-" + wu.getCorrelator();
     StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
 
@@ -96,10 +116,6 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit 
{
     GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = 
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
 
     SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
-    AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
-    // 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
-    troubleshooter.start();
-
     List<String> fileSourcePaths = workUnits.stream()
         .map(workUnit -> getCopyableFileSourcePathDesc(workUnit, 
wu.getWorkUnitPath()))
         .collect(Collectors.toList());
@@ -114,7 +130,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit 
{
     GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
         jobState.getJobId(), containerId, jobState, workUnits,
         taskStateTracker, taskExecutor, taskStateStore, 
multiTaskAttemptCommitPolicy,
-        resourcesBroker, troubleshooter.getIssueRepository(), 
createInterruptionPredicate(fs, jobState));
+        resourcesBroker, issueRepository, createInterruptionPredicate(fs, 
jobState));
     return taskAttempt.getNumTasksCreated();
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 2480accab..33c6d5f33 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -21,16 +21,15 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowOptions;
-
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index d2c193cb1..ac75971d2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -19,9 +19,13 @@ package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
 import java.util.Comparator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import org.apache.hadoop.fs.FileStatus;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
 
 /**
  * {@link AbstractEagerFsDirBackedWorkload} for {@link WorkUnitClaimCheck} 
`WORK_ITEM`s, which uses {@link WorkUnitClaimCheck#getWorkUnitPath()}
@@ -30,15 +34,17 @@ import org.apache.hadoop.fs.FileStatus;
 @lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @lombok.ToString(callSuper = true)
 public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends 
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
+  private EventSubmitterContext eventSubmitterContext;
 
-  public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String 
hdfsDir) {
+  public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String 
hdfsDir, EventSubmitterContext eventSubmitterContext) {
     super(fileSystemUri, hdfsDir);
+    this.eventSubmitterContext = eventSubmitterContext;
   }
 
   @Override
   protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
     // begin by setting all correlators to empty
-    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
fileStatus.getPath().toString());
+    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
fileStatus.getPath().toString(), this.eventSubmitterContext);
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index f03215664..5cb2064b1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -18,16 +18,22 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
 
 /**
  * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by 
claim-check, where the `workUnitPath` is resolved
@@ -41,6 +47,7 @@ public class WorkUnitClaimCheck implements FileSystemApt, 
FileSystemJobStateful
   @NonNull private String correlator;
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitPath;
+  @NonNull private EventSubmitterContext eventSubmitterContext;
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index c66ff47c1..f855b19a9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -21,6 +21,8 @@ import java.net.URI;
 import java.time.Duration;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.Path;
+
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.activity.ActivityOptions;
@@ -29,11 +31,8 @@ import io.temporal.common.RetryOptions;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
-
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobState;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 411a4b6a0..844e55731 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -86,7 +86,11 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   }
 
   protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec) {
-    return new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), 
workSpec.getWorkUnitsDir());
+    return new EagerFsDirBackedWorkUnitClaimCheckWorkload(
+        workSpec.getFileSystemUri(),
+        workSpec.getWorkUnitsDir(),
+        workSpec.getEventSubmitterContext()
+    );
   }
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f) {

Reply via email to