umustafi commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279990300


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
   public void submitFlowToDagManager(FlowSpec flowSpec)
       throws IOException {
-    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      emitFlowCompilationFailedEvent(flowSpec, 
TimingEventUtils.getFlowMetadata(flowSpec));
+      return;
+    }
+    submitFlowToDagManager(flowSpec, jobExecutionPlanDag);

Review Comment:
   We don't call orchestrate function directly from the `DagActionStoreMonitor` 
because the chain of events is `Scheduler -> Orchestrator -> FlowTriggerHandler 
(Multi-Active algorithm) -> MySQL -> Brooklin change stream -> 
DagActionStoreMonitor -> DagManager` so we don't want the MA algorithm 
triggered again. However, I did end up abstracting all the checks done to 
compile the flow and ensure an execution is allowed to re-use when submitting 
the flow to the `DagManager` from the `DagActionStoreMonitor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to