This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cb5a21568 [GOBBLIN-2045] Integrate `AutomaticTroubleshooter` with
gobblin-on-temporal `GenerateWorkUnitsImpl` (#3924)
cb5a21568 is described below
commit cb5a2156809197b27ce9976c8e8e18cc7907d597
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Apr 17 12:25:06 2024 -0700
[GOBBLIN-2045] Integrate `AutomaticTroubleshooter` with gobblin-on-temporal
`GenerateWorkUnitsImpl` (#3924)
Integrate `AutomaticTroubleshooter` with gobblin-on-temporal
`GenerateWorkUnitsImpl`
---
.../troubleshooter/AutomaticTroubleshooter.java | 2 +-
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 10 ++++++-
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 12 +-------
.../gobblin/temporal/ddm/work/assistance/Help.java | 32 ++++++++++++++++++++++
4 files changed, 43 insertions(+), 13 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
index f6e989201..e48ace129 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
@@ -44,7 +44,7 @@ public interface AutomaticTroubleshooter {
* Those events can be consumed by upstream and analytical systems.
*
* Can be disabled with
- * {@link
org.apache.gobblin.configuration.ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
* */
void reportJobIssuesAsEvents(EventSubmitter eventSubmitter)
throws TroubleshooterException;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c3de4146b..ace8bedc1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -34,16 +34,21 @@ import org.apache.hadoop.fs.Path;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -65,6 +70,8 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
+ troubleshooter.start();
try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
FileSystem fs = JobStateUtils.openFileSystem(jobState);
@@ -85,7 +92,8 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
log.error(errMsg, ioe);
throw ApplicationFailure.newFailureWithCause(errMsg, "Failure:
generating/writing workunits", ioe);
} finally {
- // TODO: implement Troubleshooter integration!
+ EventSubmitter eventSubmitter = eventSubmitterContext.create();
+ Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log,
jobState.getJobId());
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index cebe88094..c093e83f4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -82,17 +82,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
- try {
- if (troubleshooter == null) {
- log.warn("{} - No troubleshooter to report issues from automatic
troubleshooter", correlator);
- } else {
- troubleshooter.refineIssues();
- troubleshooter.logIssueSummary();
- troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
- }
- } catch (Exception e) {
- log.error(String.format("%s - Failed to report issues from automatic
troubleshooter", correlator), e);
- }
+ Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log,
correlator);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 23121bb0f..a46bf0d82 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -32,6 +32,7 @@ import com.google.common.cache.CacheBuilder;
import com.typesafe.config.Config;
+import org.slf4j.Logger;
import org.slf4j.MDC;
import org.apache.hadoop.conf.Configuration;
@@ -41,8 +42,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
@@ -247,4 +251,32 @@ public class Help {
MDC.put(ConfigurationKeys.FLOW_NAME_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, flowName));
MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecId));
}
+
+ /**
+ * refine {@link AutomaticTroubleshooter} issues then report them to the
{@link EventSubmitter} and log an issues summary via `logger`;
+ * gracefully handle `null` `troubleshooter`
+ */
+ public static void finalizeTroubleshooting(AutomaticTroubleshooter
troubleshooter, EventSubmitter eventSubmitter, Logger logger, String
correlator) {
+ try {
+ if (troubleshooter == null) {
+ logger.warn("{} - No troubleshooter to report issues from automatic
troubleshooter", correlator);
+ } else {
+ Help.reportTroubleshooterIssues(troubleshooter, eventSubmitter);
+ }
+ } catch (TroubleshooterException e) {
+ logger.error(String.format("%s - Failed to report issues from automatic
troubleshooter", correlator), e);
+ }
+ }
+
+ /**
+ * refine and report {@link AutomaticTroubleshooter} issues to the {@link
EventSubmitter}; additionally {@link AutomaticTroubleshooter#logIssueSummary()}
+ *
+ * ATTENTION: `troubleshooter` MUST NOT be `null`
+ */
+ public static void reportTroubleshooterIssues(AutomaticTroubleshooter
troubleshooter, EventSubmitter eventSubmitter)
+ throws TroubleshooterException {
+ troubleshooter.refineIssues();
+ troubleshooter.logIssueSummary();
+ troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
+ }
}