This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 42f1be58d [GOBBLIN-1987] Allow run-immediately flows to execute in 
multi-active scheduler mode (#3861)
42f1be58d is described below

commit 42f1be58d52ea307300a0d4798ead2fbd6b13c54
Author: umustafi <[email protected]>
AuthorDate: Wed Jan 24 17:01:24 2024 -0800

    [GOBBLIN-1987] Allow run-immediately flows to execute in multi-active 
scheduler mode (#3861)
    
    * Set trigger event time for run-immediately flows
    * Always lease arbitrate a flow using default trigger event time
    * Disabling flaky tests & attempted fix
    * Disable flaky GobblinServiceManagerTests
    * Revert "Disable flaky GobblinServiceManagerTests"
    This reverts commit 26572b79926f70f2828c25c300f1600c5eb3932b
    * Revert "Disabling flaky tests & attempted fix"
    This reverts commit 11255c7a169ab9e36349939182e4d1833d390ba0.
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 +-
 .../gobblin/service/modules/orchestration/Orchestrator.java      | 9 ---------
 .../service/modules/scheduler/GobblinServiceJobScheduler.java    | 6 ++----
 3 files changed, 3 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index e64dcbb93..aa9bbd50e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -114,7 +114,7 @@ public class ConfigurationKeys {
   public static final String SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY = 
"expectedReminderTimeMillis";
   // Event time of flow action to orchestrate using the multi-active lease 
arbiter
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = 
"orchestratorTriggerEventTimeMillis";
-  public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = 
"-1";
+  public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL = "0";
   public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
   public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
   public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ad9956458..2018eaadc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -237,15 +237,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
       // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
       if (flowTriggerHandler.isPresent()) {
-        // If triggerTimestampMillis was not set by the job trigger handler, 
then we do not handle this event
-        if (triggerTimestampMillis == 
Long.parseLong(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL))
 {
-          _log.warn("Skipping execution of spec: {} because missing trigger 
timestamp in job properties",
-              jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-          flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration 
skipped because no trigger timestamp "
-              + "associated with flow action.");
-          new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-          return;
-        }
 
         // Adopt consensus flowExecutionId for scheduled flows
         flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 1253858ea..ba614867f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -503,10 +503,10 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      // We always expect the trigger event time to be set so the flow will be 
skipped by the orchestrator if it is not
+      // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
-          ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
+          ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
       boolean isReminderEvent =
           
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
       this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
@@ -619,8 +619,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       }
     } else {
       _log.info("No FlowSpec schedule found, so running FlowSpec: " + 
addedSpec);
-      // Use 0 for trigger event time of an adhoc flow
-      
jobConfig.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
 "0");
       this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, 
jobConfig, null));
     }
 

Reply via email to