This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 22248e4f2 [GOBBLIN-1982] Show a consistent flowExecutionId btwn
Compilation & Execution (#3854)
22248e4f2 is described below
commit 22248e4f2b01859c7e833b99f6770a2d4bd15d6e
Author: umustafi <[email protected]>
AuthorDate: Wed Jan 10 11:16:59 2024 -0800
[GOBBLIN-1982] Show a consistent flowExecutionId btwn Compilation &
Execution (#3854)
* Use existing FlowExecutionId for non-scheduled flows to match id given to
the user.
* removes flow compilation (and event emission) done before lease
arbitration for multi-active scheduler
* Rename fields to make more readable
* Update javadoc
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/api/MultiActiveLeaseArbiter.java | 6 ++-
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 27 +++++++---
.../api/MysqlMultiActiveLeaseArbiterTest.java | 61 +++++++++++++++++-----
.../modules/orchestration/FlowTriggerHandler.java | 5 +-
.../modules/orchestration/Orchestrator.java | 50 +++++++++---------
5 files changed, 98 insertions(+), 51 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index 253db49ba..21befc26f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter {
* @param flowAction uniquely identifies the flow and the present action
upon it
* @param eventTimeMillis is the time this flow action was triggered
* @param isReminderEvent true if the flow action event we're checking on is
a reminder event
+ * @param adoptConsensusFlowExecutionId if true then replaces the flowAction
flowExecutionId returned in
+ * LeaseAttemptStatuses with the
consensus eventTime
+ *
* @return LeaseAttemptStatus
* @throws IOException
*/
- LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis, boolean isReminderEvent)
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis, boolean isReminderEvent,
+ boolean adoptConsensusFlowExecutionId)
throws IOException;
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index f514e4c01..4f86c85eb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -238,7 +238,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis,
- boolean isReminderEvent) throws IOException {
+ boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws
IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis);
// Query lease arbiter table about this flow action
@@ -249,7 +249,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE
1: no existing row for this flow action,"
+ " then go ahead and insert", flowAction, isReminderEvent ?
"reminder" : "original", eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.empty(), isReminderEvent);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.empty(),
+ isReminderEvent, adoptConsensusFlowExecutionId);
}
// Extract values from result set
@@ -288,17 +289,23 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "with database eventTimestamp {} (in epoch-millis)", flowAction,
isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbCurrentTimestamp.getTime());
+ /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to
determine whether we should use the db
+ laundered event timestamp as the flowExecutionId or maintain the
original one
+ */
+
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
- DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction =
+ adoptConsensusFlowExecutionId ?
flowAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : flowAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
- DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction =
+ adoptConsensusFlowExecutionId ?
flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()) : flowAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
@@ -317,7 +324,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
flowAction,
true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp), isReminderEvent);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp),
+ isReminderEvent, adoptConsensusFlowExecutionId);
} // No longer leasing this event
if (isWithinEpsilon) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 5: Same event, no longer leasing event"
@@ -329,7 +337,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
true, false, dbEventTimestamp, null);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp), isReminderEvent);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp),
+ isReminderEvent, adoptConsensusFlowExecutionId);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -480,7 +489,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @throws IOException
*/
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
- DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent)
+ DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent,
+ boolean adoptConsensusFlowExecutionId)
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
@@ -488,7 +498,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
- DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis);
+ DagActionStore.DagAction updatedFlowAction =
+ adoptConsensusFlowExecutionId ?
flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : flowAction;
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 08630ab36..a77949541 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -42,13 +42,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
private static final String flowGroup = "testFlowGroup";
+ private static final String flowGroup2 = "testFlowGroup2";
private static final String flowName = "testFlowName";
private static final String flowExecutionId = "12345677";
- // The following are considered unique because they correspond to different
flow action types
+ // Dag actions with the same flow info but different flow action types are
considered unique
private static DagActionStore.DagAction launchDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
private static DagActionStore.DagAction resumeDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.RESUME);
+ private static DagActionStore.DagAction launchDagAction2 =
+ new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
private static final long eventTimeMillis = System.currentTimeMillis();
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -77,14 +80,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
/*
Tests all cases of trying to acquire a lease (CASES 1-6 detailed below)
for a flow action event with one
- participant involved.
+ participant involved. All of the cases allow the flowExecutionId to be
updated by lease arbiter by setting
+ `adoptConsensusFlowExecutionId` to true.
*/
// TODO: refactor this to break it into separate test cases as much is
possible
@Test
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
@@ -98,7 +102,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(killStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
@@ -109,7 +113,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Very little time should have passed if this test directly follows the
one above so this call will be considered
// the same as the previous event
MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
@@ -121,7 +125,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -131,7 +135,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(fourthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
@@ -144,14 +148,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
@@ -224,7 +228,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
- Optional.empty(), false);
+ Optional.empty(), false, true);
// The following insert will fail since eventTimestamp does not match the
expected
int numRowsUpdated =
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
@@ -249,7 +253,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
olderEventTimestamp, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
olderEventTimestamp, true, true);
Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
}
@@ -264,7 +268,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus)
attemptStatus;
Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
selectInfoResult.getEventTimeMillis());
@@ -281,7 +285,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
Thread.sleep(LINGER);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimeMillis() >
selectInfoResult.getEventTimeMillis());
@@ -309,7 +313,36 @@ public class MysqlMultiActiveLeaseArbiterTest {
Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
}
+
+ /*
+ Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set
to True and verify that flowExecutionId
+ returned is the same as flowExecutionId provided to it for a
LeaseObtainedStatus and LeasedToAnotherStatus object
+ (CASE 1 & 2).
+ */
+ @Test
+ public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
+ // Obtain a lease for a new action and verify its flowExecutionId is not
updated
+ MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
+ Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+ MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+ Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+ new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH)));
+
+ // A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
+ // the original flowExecutionId
+ MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
+ Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+ MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+ Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
+ Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+ new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH)));
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index a1b141d95..f0be63366 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -111,13 +111,14 @@ public class FlowTriggerHandler {
* @param flowAction
* @param eventTimeMillis
* @param isReminderEvent
+ * @param skipFlowExecutionIdReplacement
* @throws IOException
*/
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis,
- boolean isReminderEvent) throws IOException {
+ boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws
IOException {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(
- flowAction, eventTimeMillis, isReminderEvent);
+ flowAction, eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
// The flow action contained in the`LeaseAttemptStatus` from the lease
arbiter contains an updated flow execution
// id. From this point onwards, always use the newer version of the flow
action to easily track the action through
// orchestration and execution.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f839fda2d..ce7931c82 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -58,6 +58,7 @@ import
org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -244,27 +245,12 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
- Optional<TimingEvent> flowCompilationTimer =
- this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
- Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
spec, flowGroup,
- flowName);
- if (!jobExecutionPlanDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
+
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
- java.util.Optional<String> flowExecutionId =
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
-
- // Unexpected result because flowExecutionId should be provided by above
call too 'addFlowExecutionIdIfAbsent'
- if (!flowExecutionId.isPresent()) {
- _log.warn("FlowMetadata does not contain flowExecutionId when it
should have been provided. Skipping execution "
- + "of: {}", spec);
- return;
- }
+ String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
+
DagActionStore.DagAction flowAction =
- new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH);
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
@@ -281,12 +267,24 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
return;
}
- flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent);
+ // Adopt consensus flowExecutionId for scheduled flows
+ flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent,
+ flowSpec.isScheduled());
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- Dag<JobExecutionPlan> jobExecutionPlanDag =
jobExecutionPlanDagOptional.get();
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
spec, flowGroup,
+ flowName);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag == null || compiledDag.isEmpty()) {
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
spec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
@@ -297,17 +295,17 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDag);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
if (flowCompilationTimer.isPresent()) {
flowCompilationTimer.get().stop(flowMetadata);
}
// Depending on if DagManager is present, handle execution
if (this.dagManager.isPresent()) {
- submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
+ submitFlowToDagManager(flowSpec, compiledDag);
} else {
// Schedule all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : compiledDag.getNodes())
{
DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
@@ -463,7 +461,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Deletes spec from flowCatalog if it is an adhoc flow (not containing a job
schedule)
*/
private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
- if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ if (!flowSpec.isScheduled()) {
this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(),
false);
}
}