[ 
https://issues.apache.org/jira/browse/GOBBLIN-2020?focusedWorklogId=910956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-910956
 ]

ASF GitHub Bot logged work on GOBBLIN-2020:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/24 19:37
            Start Date: 21/Mar/24 19:37
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1534554929


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+        }
       }
+      timer.stop();
+    } catch (Exception e) {
+      // Emit a failed GobblinTrackingEvent to record job failures
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();

Review Comment:
   I recall Kip intentionally made it so that no events were emitted when an 
exception occurs. Check with him on that.
   
   The original intention was to emulate MR behavior





Issue Time Tracking
-------------------

    Worklog Id:     (was: 910956)
    Time Spent: 50m  (was: 40m)

> Fixes failed workflow paths in Temporal to properly emit GTE and fail job 
> when commit fails
> -------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2020
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2020
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> There are a few bugs in Gobblin-Temporal execution mode:
> 1. If the publishing step fails, the activity does not report a failure due 
> to missing a step post commit to check the dataset states
> 2. No GTEs are emitted upon job failure, which makes tracking difficult
> 3. Some metadata propagation for flow execution ID with workflows is 
> incorrect due to a bug reading worker configs instead of job props
> 4. The GenerateWus activity does not return the right number of workunits 
> created due to counting top level multiworkunits



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to