[
https://issues.apache.org/jira/browse/GOBBLIN-2013?focusedWorklogId=909290&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909290
]
ASF GitHub Bot logged work on GOBBLIN-2013:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Mar/24 22:09
Start Date: 11/Mar/24 22:09
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3892:
URL: https://github.com/apache/gobblin/pull/3892#discussion_r1520481347
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -208,18 +214,20 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
metricContext.register(this.totalAddSpecTimeNanos);
metricContext.register(this.numJobsScheduledDuringStartup);
}
+ this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
Review Comment:
u can also add this as an injected param. up to u (look at line 183)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
- Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ FlowSpec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
- this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ if (this.dagProcessingEngineEnabled) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName =
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+ DagActionStore.DagAction dagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+ this.dagManagement.addDagAction(dagAction);
+ } else {
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ }
Review Comment:
Also wherever your enable this let's add a test
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -502,14 +510,24 @@ public static long utcDateAsUTCEpochMillis(Date date) {
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
- Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ FlowSpec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
- this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ if (this.dagProcessingEngineEnabled) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName =
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+ DagActionStore.DagAction dagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+ this.dagManagement.addDagAction(dagAction);
+ } else {
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
+ }
Review Comment:
This is not the correct place to add dag action to the stream. Orchestrate
is called in response to scheduler and is where _scheduler lease arbitration_
will happen. As a result of it, the `launch dagAction` will be committed to
`dagActionStore` -> read by `changeMonitor` -> then forwarded to the
`dagManager`. It's at the last step that we should add the launch task to the
stream (via `dagManagement`) instead of pass to old `dagManager`. This change
should be in `DagProcEnabledDagActionStoreChangeMonitor`
Issue Time Tracking
-------------------
Worklog Id: (was: 909290)
Remaining Estimate: 0h
Time Spent: 10m
> create instances of new new class created in PR #3858
> -----------------------------------------------------
>
> Key: GOBBLIN-2013
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2013
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)