This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 42f1be58d [GOBBLIN-1987] Allow run-immediately flows to execute in
multi-active scheduler mode (#3861)
42f1be58d is described below
commit 42f1be58d52ea307300a0d4798ead2fbd6b13c54
Author: umustafi <[email protected]>
AuthorDate: Wed Jan 24 17:01:24 2024 -0800
[GOBBLIN-1987] Allow run-immediately flows to execute in multi-active
scheduler mode (#3861)
* Set trigger event time for run-immediately flows
* Always lease arbitrate a flow using default trigger event time
* Disabling flaky tests & attempted fix
* Disable flaky GobblinServiceManagerTests
* Revert "Disable flaky GobblinServiceManagerTests"
This reverts commit 26572b79926f70f2828c25c300f1600c5eb3932b
* Revert "Disabling flaky tests & attempted fix"
This reverts commit 11255c7a169ab9e36349939182e4d1833d390ba0.
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 +-
.../gobblin/service/modules/orchestration/Orchestrator.java | 9 ---------
.../service/modules/scheduler/GobblinServiceJobScheduler.java | 6 ++----
3 files changed, 3 insertions(+), 14 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index e64dcbb93..aa9bbd50e 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -114,7 +114,7 @@ public class ConfigurationKeys {
public static final String SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY =
"expectedReminderTimeMillis";
// Event time of flow action to orchestrate using the multi-active lease
arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY =
"orchestratorTriggerEventTimeMillis";
- public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL =
"-1";
+ public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL = "0";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ad9956458..2018eaadc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -237,15 +237,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
if (flowTriggerHandler.isPresent()) {
- // If triggerTimestampMillis was not set by the job trigger handler,
then we do not handle this event
- if (triggerTimestampMillis ==
Long.parseLong(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL))
{
- _log.warn("Skipping execution of spec: {} because missing trigger
timestamp in job properties",
- jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration
skipped because no trigger timestamp "
- + "associated with flow action.");
- new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- return;
- }
// Adopt consensus flowExecutionId for scheduled flows
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 1253858ea..ba614867f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -503,10 +503,10 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- // We always expect the trigger event time to be set so the flow will be
skipped by the orchestrator if it is not
+ // The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
- ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
+ ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
@@ -619,8 +619,6 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
} else {
_log.info("No FlowSpec schedule found, so running FlowSpec: " +
addedSpec);
- // Use 0 for trigger event time of an adhoc flow
-
jobConfig.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
"0");
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true,
jobConfig, null));
}