[ 
https://issues.apache.org/jira/browse/GOBBLIN-1863?focusedWorklogId=873886&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-873886
 ]

ASF GitHub Bot logged work on GOBBLIN-1863:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/23 19:15
            Start Date: 31/Jul/23 19:15
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279760164


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
   public void submitFlowToDagManager(FlowSpec flowSpec)
       throws IOException {
-    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      emitFlowCompilationFailedEvent(flowSpec, 
TimingEventUtils.getFlowMetadata(flowSpec));
+      return;
+    }
+    submitFlowToDagManager(flowSpec, jobExecutionPlanDag);

Review Comment:
   So I'm wondering why we don't directly call the `orchestrate()` function 
from the SpecMonitor instead of `submitFlowToDagManager`, because there are a 
few more checks done in that function, namely:
   1. Checking for another execution
   2. Ensuring that the specCompiler is ready before compilation (happens less 
these days with a file based compiler but still possible that there will be a 
race condition introduced where you try to compile a flow when the service 
starts up but the flowgraph is not finished loading into memory).
   
   If this is not possible, we need to abstract these checks to another 
function and ensure that they're done before submitting to the dagmanager in 
any configuration.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 873886)
    Time Spent: 40m  (was: 0.5h)

> Multi-Active Launch Job Related Issues
> --------------------------------------
>
>                 Key: GOBBLIN-1863
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1863
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Urmi Mustafi
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> * DagManager check leader status before addDag bc calling this method from 
> non-leader hosts throws a NPE which may caused failed dag event to be emitted
>  * also handle LAUNCH type events upon leader change and setting a new 
> participant DagManager to be active. Failing to handle these events may be 
> causing missed flow launches on any leader change or restart. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to