phet commented on code in PR #3854:
URL: https://github.com/apache/gobblin/pull/3854#discussion_r1446815650


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -463,7 +461,7 @@ private void deleteFromExecutor(Spec spec, Properties 
headers) {
    Deletes spec from flowCatalog if it is an adhoc flow (not containing a job 
schedule)
  */
   private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
-    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+    if (!flowSpec.isScheduled()) {

Review Comment:
   big improvement! :)
   
   did we have this method all along, but just weren't using it here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter {
    * @param flowAction uniquely identifies the flow and the present action 
upon it
    * @param eventTimeMillis is the time this flow action was triggered
    * @param isReminderEvent true if the flow action event we're checking on is 
a reminder event
+   * @param skipFlowExecutionIdReplacement if true then does not replace the 
flowExecutionId in the flowAction returned

Review Comment:
   skip... replacement naming seems indirect.  better might be to reverse the 
sense and call it `replaceFlowExecId`.  even better might be 
`adoptConsensusFlowExecutionId`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -281,10 +267,22 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
           return;
         }
 
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        // Skip flowExecutionId replacement for adhoc flows
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent,
+            !flowSpec.isScheduled());
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
+        Optional<TimingEvent> flowCompilationTimer =
+          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+        Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =

Review Comment:
   nit: I'd clarify the sense of optional by calling `compiledDag` or 
`validatedDag`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to