umustafi commented on code in PR #3892:
URL: https://github.com/apache/gobblin/pull/3892#discussion_r1520481347
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -208,18 +214,20 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
metricContext.register(this.totalAddSpecTimeNanos);
metricContext.register(this.numJobsScheduledDuringStartup);
}
+ this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
Review Comment:
u can also add this as an injected param. up to u (look at line 183)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
- Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ FlowSpec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
- this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ if (this.dagProcessingEngineEnabled) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName =
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+ DagActionStore.DagAction dagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+ this.dagManagement.addDagAction(dagAction);
+ } else {
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ }
Review Comment:
Also wherever your enable this let's add a test
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
- Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ FlowSpec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
- this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ if (this.dagProcessingEngineEnabled) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName =
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+ DagActionStore.DagAction dagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+ this.dagManagement.addDagAction(dagAction);
+ } else {
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ }
Review Comment:
This is not the correct place to add dag action to the stream. Orchestrate
is called in response to scheduler and is where _scheduler lease arbitration_
will happen. As a result of it, the `launch dagAction` will be committed to
`dagActionStore` -> read by `changeMonitor` -> then forwarded to the
`dagManager`. It's at the last step that we should add the launch task to the
stream (via `dagManagement`) instead of pass to old `dagManager`. This change
should be in `DagProcEnabledDagActionStoreChangeMonitor`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]