[ 
https://issues.apache.org/jira/browse/GOBBLIN-2013?focusedWorklogId=909290&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909290
 ]

ASF GitHub Bot logged work on GOBBLIN-2013:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/24 22:09
            Start Date: 11/Mar/24 22:09
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3892:
URL: https://github.com/apache/gobblin/pull/3892#discussion_r1520481347


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -208,18 +214,20 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
       metricContext.register(this.totalAddSpecTimeNanos);
       metricContext.register(this.numJobsScheduledDuringStartup);
     }
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);

Review Comment:
   u can also add this as an injected param. up to u (look at line 183)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
-      Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+      FlowSpec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
       boolean isReminderEvent =
           
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
-      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      if (this.dagProcessingEngineEnabled) {
+        Config flowConfig = flowSpec.getConfig();
+        String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+        String flowName = 
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+        String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+        DagActionStore.DagAction dagAction =
+            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+        this.dagManagement.addDagAction(dagAction);
+      } else {
+        this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      }

Review Comment:
   Also wherever your enable this let's add a test



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
-      Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+      FlowSpec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
       boolean isReminderEvent =
           
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
-      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      if (this.dagProcessingEngineEnabled) {
+        Config flowConfig = flowSpec.getConfig();
+        String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+        String flowName = 
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+        String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+        DagActionStore.DagAction dagAction =
+            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+        this.dagManagement.addDagAction(dagAction);
+      } else {
+        this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+      }

Review Comment:
   This is not the correct place to add dag action to the stream. Orchestrate 
is called in response to scheduler and is where _scheduler lease arbitration_  
will happen. As a result of it, the `launch dagAction` will be committed to 
`dagActionStore` -> read by `changeMonitor` -> then forwarded to the 
`dagManager`. It's at the last step that we should add the launch task to the 
stream (via `dagManagement`) instead of pass to old `dagManager`. This change 
should be in `DagProcEnabledDagActionStoreChangeMonitor`





Issue Time Tracking
-------------------

            Worklog Id:     (was: 909290)
    Remaining Estimate: 0h
            Time Spent: 10m

> create instances of new new class created in PR #3858
> -----------------------------------------------------
>
>                 Key: GOBBLIN-2013
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2013
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to