umustafi commented on code in PR #3892:
URL: https://github.com/apache/gobblin/pull/3892#discussion_r1520481347


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -208,18 +214,20 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
       metricContext.register(this.totalAddSpecTimeNanos);
       metricContext.register(this.numJobsScheduledDuringStartup);
     }
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);

Review Comment:
   u can also add this as an injected param. up to u (look at line 183)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
-      Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+      FlowSpec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
       boolean isReminderEvent =
           
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
-      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      if (this.dagProcessingEngineEnabled) {
+        Config flowConfig = flowSpec.getConfig();
+        String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+        String flowName = 
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+        String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+        DagActionStore.DagAction dagAction =
+            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+        this.dagManagement.addDagAction(dagAction);
+      } else {
+        this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      }

Review Comment:
   Also wherever your enable this let's add a test



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
-      Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+      FlowSpec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
       boolean isReminderEvent =
           
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
-      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      if (this.dagProcessingEngineEnabled) {
+        Config flowConfig = flowSpec.getConfig();
+        String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+        String flowName = 
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+        String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+        DagActionStore.DagAction dagAction =
+            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+        this.dagManagement.addDagAction(dagAction);
+      } else {
+        this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      }

Review Comment:
   This is not the correct place to add dag action to the stream. Orchestrate 
is called in response to scheduler and is where _scheduler lease arbitration_  
will happen. As a result of it, the `launch dagAction` will be committed to 
`dagActionStore` -> read by `changeMonitor` -> then forwarded to the 
`dagManager`. It's at the last step that we should add the launch task to the 
stream (via `dagManagement`) instead of pass to old `dagManager`. This change 
should be in `DagProcEnabledDagActionStoreChangeMonitor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to