This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 22248e4f2 [GOBBLIN-1982] Show a consistent flowExecutionId btwn 
Compilation & Execution  (#3854)
22248e4f2 is described below

commit 22248e4f2b01859c7e833b99f6770a2d4bd15d6e
Author: umustafi <[email protected]>
AuthorDate: Wed Jan 10 11:16:59 2024 -0800

    [GOBBLIN-1982] Show a consistent flowExecutionId btwn Compilation & 
Execution  (#3854)
    
    * Use existing FlowExecutionId for non-scheduled flows to match id given to 
the user.
    
            * removes flow compilation (and event emission) done before lease 
arbitration for multi-active scheduler
    
    * Rename fields to make more readable
    
    * Update javadoc
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/api/MultiActiveLeaseArbiter.java       |  6 ++-
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 27 +++++++---
 .../api/MysqlMultiActiveLeaseArbiterTest.java      | 61 +++++++++++++++++-----
 .../modules/orchestration/FlowTriggerHandler.java  |  5 +-
 .../modules/orchestration/Orchestrator.java        | 50 +++++++++---------
 5 files changed, 98 insertions(+), 51 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index 253db49ba..21befc26f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter {
    * @param flowAction uniquely identifies the flow and the present action 
upon it
    * @param eventTimeMillis is the time this flow action was triggered
    * @param isReminderEvent true if the flow action event we're checking on is 
a reminder event
+   * @param adoptConsensusFlowExecutionId if true then replaces the flowAction 
flowExecutionId returned in
+   *                                      LeaseAttemptStatuses with the 
consensus eventTime
+   *
    * @return LeaseAttemptStatus
    * @throws IOException
    */
-  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis, boolean isReminderEvent)
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis, boolean isReminderEvent,
+      boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index f514e4c01..4f86c85eb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -238,7 +238,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
-      boolean isReminderEvent) throws IOException {
+      boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws 
IOException {
     log.info("Multi-active scheduler about to handle trigger event: [{}, is: 
{}, triggerEventTimestamp: {}]",
         flowAction, isReminderEvent ? "reminder" : "original", 
eventTimeMillis);
     // Query lease arbiter table about this flow action
@@ -249,7 +249,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 
1: no existing row for this flow action,"
             + " then go ahead and insert", flowAction, isReminderEvent ? 
"reminder" : "original", eventTimeMillis);
         int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
-       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.empty(), isReminderEvent);
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.empty(),
+           isReminderEvent, adoptConsensusFlowExecutionId);
       }
 
       // Extract values from result set
@@ -288,17 +289,23 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           + "with database eventTimestamp {} (in epoch-millis)", flowAction, 
isReminderEvent ? "reminder" : "original",
           eventTimeMillis, dbCurrentTimestamp.getTime());
 
+      /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to 
determine whether we should use the db
+      laundered event timestamp as the flowExecutionId or maintain the 
original one
+       */
+
       // Lease is valid
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
-          DagActionStore.DagAction updatedFlowAction = 
flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
+          DagActionStore.DagAction updatedFlowAction =
+              adoptConsensusFlowExecutionId ? 
flowAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : flowAction;
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
               updatedFlowAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
           return new LeasedToAnotherStatus(updatedFlowAction,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
-        DagActionStore.DagAction updatedFlowAction = 
flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
+        DagActionStore.DagAction updatedFlowAction =
+            adoptConsensusFlowExecutionId ? 
flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()) : flowAction;
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
             updatedFlowAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
@@ -317,7 +324,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, 
flowAction,
             true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp), isReminderEvent);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp),
+            isReminderEvent, adoptConsensusFlowExecutionId);
       } // No longer leasing this event
         if (isWithinEpsilon) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 5: Same event, no longer leasing event"
@@ -329,7 +337,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
             true, false, dbEventTimestamp, null);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp), isReminderEvent);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp),
+            isReminderEvent, adoptConsensusFlowExecutionId);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -480,7 +489,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @throws IOException
    */
   protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
-      DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent)
+      DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent,
+      boolean adoptConsensusFlowExecutionId)
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
     SelectInfoResult selectInfoResult = getRowInfo(flowAction);
@@ -488,7 +498,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
       return new NoLongerLeasingStatus();
     }
-    DagActionStore.DagAction updatedFlowAction = 
flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis);
+    DagActionStore.DagAction updatedFlowAction =
+        adoptConsensusFlowExecutionId ? 
flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : flowAction;
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", updatedFlowAction,
           isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 08630ab36..a77949541 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -42,13 +42,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String PASSWORD = "testPassword";
   private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
   private static final String flowGroup = "testFlowGroup";
+  private static final String flowGroup2 = "testFlowGroup2";
   private static final String flowName = "testFlowName";
   private static final String flowExecutionId = "12345677";
-  // The following are considered unique because they correspond to different 
flow action types
+  // Dag actions with the same flow info but different flow action types are 
considered unique
   private static DagActionStore.DagAction launchDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
   private static DagActionStore.DagAction resumeDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.RESUME);
+  private static DagActionStore.DagAction launchDagAction2 =
+      new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
   private static final long eventTimeMillis = System.currentTimeMillis();
   private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -77,14 +80,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
 
   /*
      Tests all cases of trying to acquire a lease (CASES 1-6 detailed below) 
for a flow action event with one
-     participant involved.
+     participant involved. All of the cases allow the flowExecutionId to be 
updated by lease arbiter by setting
+     `adoptConsensusFlowExecutionId` to true.
   */
   // TODO: refactor this to break it into separate test cases as much is 
possible
   @Test
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
     MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
@@ -98,7 +102,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     DagActionStore.DagAction killDagAction = new
         DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
     MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(killStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
@@ -109,7 +113,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Very little time should have passed if this test directly follows the 
one above so this call will be considered
     // the same as the previous event
     MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
@@ -121,7 +125,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
     Thread.sleep(MORE_THAN_EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(thirdLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -131,7 +135,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Tests CASE 4 of lease out of date
     Thread.sleep(LINGER);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(fourthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
@@ -144,14 +148,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(MORE_THAN_EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
     Assert.assertTrue(sixthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
@@ -224,7 +228,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(markedSuccess);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
     mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction,
-        Optional.empty(), false);
+        Optional.empty(), false, true);
 
     // The following insert will fail since eventTimestamp does not match the 
expected
     int numRowsUpdated = 
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
@@ -249,7 +253,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
olderEventTimestamp, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
olderEventTimestamp, true, true);
     Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
   }
 
@@ -264,7 +268,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
     Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
     LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus) 
attemptStatus;
     Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
selectInfoResult.getEventTimeMillis());
@@ -281,7 +285,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     Thread.sleep(LINGER);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
     Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
     LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
     Assert.assertTrue(obtainedStatus.getEventTimeMillis() > 
selectInfoResult.getEventTimeMillis());
@@ -309,7 +313,36 @@ public class MysqlMultiActiveLeaseArbiterTest {
      Thread.sleep(MORE_THAN_EPSILON);
      // Now have a reminder event check-in on the completed lease
      LeaseAttemptStatus attemptStatus =
-         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
      Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
    }
+
+   /*
+   Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set 
to True and verify that flowExecutionId
+   returned is the same as flowExecutionId provided to it for a 
LeaseObtainedStatus and LeasedToAnotherStatus object
+   (CASE 1 & 2).
+   */
+  @Test
+  public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
+    // Obtain a lease for a new action and verify its flowExecutionId is not 
updated
+    MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
+    Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+    MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
+        (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+    Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+        new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH)));
+
+    // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
+    // the original flowExecutionId
+    MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
+    Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+    MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+    Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
+    Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+        new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH)));
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index a1b141d95..f0be63366 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -111,13 +111,14 @@ public class FlowTriggerHandler {
    * @param flowAction
    * @param eventTimeMillis
    * @param isReminderEvent
+   * @param skipFlowExecutionIdReplacement
    * @throws IOException
    */
   public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis,
-      boolean isReminderEvent) throws IOException {
+      boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws 
IOException {
     if (multiActiveLeaseArbiter.isPresent()) {
       MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(
-          flowAction, eventTimeMillis, isReminderEvent);
+          flowAction, eventTimeMillis, isReminderEvent, 
skipFlowExecutionIdReplacement);
       // The flow action contained in the`LeaseAttemptStatus` from the lease 
arbiter contains an updated flow execution
       // id. From this point onwards, always use the newer version of the flow 
action to easily track the action through
       // orchestration and execution.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f839fda2d..ce7931c82 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -58,6 +58,7 @@ import 
org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -244,27 +245,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
-      Optional<TimingEvent> flowCompilationTimer =
-          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
-      Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
-          
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 spec, flowGroup,
-              flowName);
-      if (!jobExecutionPlanDagOptional.isPresent()) {
-        Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-        return;
-      }
+
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-      FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
-      java.util.Optional<String> flowExecutionId = 
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
-
-      // Unexpected result because flowExecutionId should be provided by above 
call too 'addFlowExecutionIdIfAbsent'
-      if (!flowExecutionId.isPresent()) {
-        _log.warn("FlowMetadata does not contain flowExecutionId when it 
should have been provided. Skipping execution "
-            + "of: {}", spec);
-        return;
-      }
+      String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+
       DagActionStore.DagAction flowAction =
-          new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH);
+          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
       // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
@@ -281,12 +267,24 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           return;
         }
 
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        // Adopt consensus flowExecutionId for scheduled flows
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent,
+            flowSpec.isScheduled());
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
-        Dag<JobExecutionPlan> jobExecutionPlanDag = 
jobExecutionPlanDagOptional.get();
-        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+        Optional<TimingEvent> flowCompilationTimer =
+          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+        Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 spec, flowGroup,
+                flowName);
+
+        if (!compiledDagOptional.isPresent()) {
+          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+          return;
+        }
+        Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+        if (compiledDag == null || compiledDag.isEmpty()) {
           
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 spec, flowMetadata);
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
@@ -297,17 +295,17 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
             SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
-        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDag);
+        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
         if (flowCompilationTimer.isPresent()) {
           flowCompilationTimer.get().stop(flowMetadata);
         }
 
         // Depending on if DagManager is present, handle execution
         if (this.dagManager.isPresent()) {
-          submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
+          submitFlowToDagManager(flowSpec, compiledDag);
         } else {
           // Schedule all compiled JobSpecs on their respective Executor
-          for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
+          for (Dag.DagNode<JobExecutionPlan> dagNode : compiledDag.getNodes()) 
{
             DagManagerUtils.incrementJobAttempt(dagNode);
             JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
@@ -463,7 +461,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
    Deletes spec from flowCatalog if it is an adhoc flow (not containing a job 
schedule)
  */
   private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
-    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+    if (!flowSpec.isScheduled()) {
       this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), 
false);
     }
   }

Reply via email to