This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cb5a21568 [GOBBLIN-2045] Integrate `AutomaticTroubleshooter` with 
gobblin-on-temporal `GenerateWorkUnitsImpl` (#3924)
cb5a21568 is described below

commit cb5a2156809197b27ce9976c8e8e18cc7907d597
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Apr 17 12:25:06 2024 -0700

    [GOBBLIN-2045] Integrate `AutomaticTroubleshooter` with gobblin-on-temporal 
`GenerateWorkUnitsImpl` (#3924)
    
    Integrate `AutomaticTroubleshooter` with gobblin-on-temporal 
`GenerateWorkUnitsImpl`
---
 .../troubleshooter/AutomaticTroubleshooter.java    |  2 +-
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   | 10 ++++++-
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     | 12 +-------
 .../gobblin/temporal/ddm/work/assistance/Help.java | 32 ++++++++++++++++++++++
 4 files changed, 43 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
index f6e989201..e48ace129 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
@@ -44,7 +44,7 @@ public interface AutomaticTroubleshooter {
    * Those events can be consumed by upstream and analytical systems.
    *
    * Can be disabled with
-   * {@link 
org.apache.gobblin.configuration.ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
+   * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
    * */
   void reportJobIssuesAsEvents(EventSubmitter eventSubmitter)
       throws TroubleshooterException;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c3de4146b..ace8bedc1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -34,16 +34,21 @@ import org.apache.hadoop.fs.Path;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
@@ -65,6 +70,8 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     // jobState.setBroker(jobBroker);
     // jobState.setWorkUnitAndDatasetStateFunctional(new 
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
 
+    AutomaticTroubleshooter troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
+    troubleshooter.start();
     try (Closer closer = Closer.create()) {
       // before embarking on (potentially expensive) WU creation, first 
pre-check that the FS is available
       FileSystem fs = JobStateUtils.openFileSystem(jobState);
@@ -85,7 +92,8 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       log.error(errMsg, ioe);
       throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: 
generating/writing workunits", ioe);
     } finally {
-      // TODO: implement Troubleshooter integration!
+      EventSubmitter eventSubmitter = eventSubmitterContext.create();
+      Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, 
jobState.getJobId());
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index cebe88094..c093e83f4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -82,17 +82,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
     } catch (IOException | InterruptedException e) {
       throw new RuntimeException(e);
     } finally {
-      try {
-        if (troubleshooter == null) {
-          log.warn("{} - No troubleshooter to report issues from automatic 
troubleshooter", correlator);
-        } else {
-          troubleshooter.refineIssues();
-          troubleshooter.logIssueSummary();
-          troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
-        }
-      } catch (Exception e) {
-        log.error(String.format("%s - Failed to report issues from automatic 
troubleshooter", correlator), e);
-      }
+      Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, 
correlator);
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 23121bb0f..a46bf0d82 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -32,6 +32,7 @@ import com.google.common.cache.CacheBuilder;
 
 import com.typesafe.config.Config;
 
+import org.slf4j.Logger;
 import org.slf4j.MDC;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,8 +42,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
@@ -247,4 +251,32 @@ public class Help {
     MDC.put(ConfigurationKeys.FLOW_NAME_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, flowName));
     MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecId));
   }
+
+  /**
+   * refine {@link AutomaticTroubleshooter} issues then report them to the 
{@link EventSubmitter} and log an issues summary via `logger`;
+   * gracefully handle `null` `troubleshooter`
+   */
+  public static void finalizeTroubleshooting(AutomaticTroubleshooter 
troubleshooter, EventSubmitter eventSubmitter, Logger logger, String 
correlator) {
+    try {
+      if (troubleshooter == null) {
+        logger.warn("{} - No troubleshooter to report issues from automatic 
troubleshooter", correlator);
+      } else {
+        Help.reportTroubleshooterIssues(troubleshooter, eventSubmitter);
+      }
+    } catch (TroubleshooterException e) {
+      logger.error(String.format("%s - Failed to report issues from automatic 
troubleshooter", correlator), e);
+    }
+  }
+
+  /**
+   * refine and report {@link AutomaticTroubleshooter} issues to the {@link 
EventSubmitter}; additionally {@link AutomaticTroubleshooter#logIssueSummary()}
+   *
+   * ATTENTION: `troubleshooter` MUST NOT be `null`
+   */
+  public static void reportTroubleshooterIssues(AutomaticTroubleshooter 
troubleshooter, EventSubmitter eventSubmitter)
+      throws TroubleshooterException {
+    troubleshooter.refineIssues();
+    troubleshooter.logIssueSummary();
+    troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
+  }
 }

Reply via email to