phet commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1408442638


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> 
optionalFlowExecutionId) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, 
optionalFlowExecutionId);
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+            Optional.of(flowAction.getFlowExecutionId()));
     if (optionalJobExecutionPlanDag.isPresent()) {
-      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(), 
flowAction);
     } else {
       _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag,
+      DagActionStore.DagAction launchAction)
       throws IOException {
     try {
-      //Send the dag to the DagManager.
+      // Send the dag to the DagManager and delete the action after persisting 
it to avoid redundant execution on start up
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+      this.dagActionStore.get().deleteDagAction(launchAction);

Review Comment:
   what if the DM hasn't successfully handled it (e.g. the service fails after 
line 364, but before the DM actions complete)?  would the action/event be lost?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -195,15 +194,15 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
-  protected void submitFlowToDagManagerHelper(String flowGroup, String 
flowName, String flowExecutionId) {
+  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction) {

Review Comment:
   excellent--much clearer method signature!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java:
##########
@@ -47,6 +47,13 @@ static Map<String, String> getFlowMetadata(Config 
flowConfig) {
     return metadata;
   }
 
+  /**
+   * Retrieves a flowExecutionId from flowMetadata map and returns dummy value 
if one is not set
+   */
+  public static String getFlowExecutionIdFromFlowMetadata(Map<String, String> 
flowMetadata) {
+    return 
flowMetadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
 "<<no flowExecutionId>>");
+  }

Review Comment:
   I really like the abstraction!  as far as where this should live... don't we 
already have another utils class with static methods for extracting the flow 
group and flow name?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -292,7 +296,11 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
 
         // Depending on if DagManager is present, handle execution
         if (this.dagManager.isPresent()) {
-          submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+          DagActionStore.DagAction launchAction =
+              new DagActionStore.DagAction(flowGroup, flowName,
+                  
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata),
+                  DagActionStore.FlowActionType.LAUNCH);

Review Comment:
   could we raise up the `DagAction` initialization on line 274 above the `if` 
block, so it can be shared by the `else`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to