This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e3108ddf8 [GOBBLIN-2097] Use unique JobDataMaps and Properties to use
for reminder events (#3984)
e3108ddf8 is described below
commit e3108ddf8ea11c015fb099d8da586f4ebfdc7557
Author: umustafi <[email protected]>
AuthorDate: Tue Jun 25 14:24:26 2024 -0700
[GOBBLIN-2097] Use unique JobDataMaps and Properties to use for reminder
events (#3984)
* Use unique JobDataMaps and Properties to use for reminder events
---
.../modules/orchestration/FlowLaunchHandler.java | 50 ++++++++++++----------
.../MysqlMultiActiveLeaseArbiter.java | 36 +++++++---------
.../orchestration/FlowLaunchHandlerTest.java | 33 ++++++++++++--
3 files changed, 71 insertions(+), 48 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 967d89a89..e548f16ad 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -152,8 +152,8 @@ public class FlowLaunchHandler {
}
/**
- * This method is used by {@link
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to
check on
- * the other participant's progress to finish acting on a dag action after
the time the lease should expire.
+ * This method is used by {@link
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to
check
+ * on the other participant's progress to finish acting on a dag action
after the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for (stored in
`consensusDagAction`) and the minimum time after
* which reminder should occur
@@ -197,7 +197,8 @@ public class FlowLaunchHandler {
// refer to the same set of jobProperties)
String reminderSuffix = createSuffixForJobTrigger(status);
JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix,
origJobKey.getGroup());
- JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey,
reminderJobKey, status);
+ JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey,
status);
+ jobDetail.setKey(reminderJobKey);
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey,
getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to
schedule reminder for event {} with "
@@ -223,20 +224,20 @@ public class FlowLaunchHandler {
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
- return jobDetail;
+ // 1. shallow `.clone()` this top-level `JobDetailImpl`
+ JobDetailImpl clonedJobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey).clone();
+ JobDataMap originalJobDataMap = clonedJobDetail.getJobDataMap();
+ // 2. create a fresh `JobDataMap` specific to the reminder
+ JobDataMap newJobDataMap = cloneAndUpdateJobProperties(originalJobDataMap,
status, schedulerMaxBackoffMillis);
+ // 3. update `clonedJobDetail` to point to the new `JobDataMap`
+ clonedJobDetail.setJobDataMap(newJobDataMap);
+ return clonedJobDetail;
}
public static Properties getJobPropertiesFromJobDetail(JobDetail jobDetail) {
@@ -244,35 +245,38 @@ public class FlowLaunchHandler {
}
/**
- * Updates the cronExpression, reminderTimestamp, originalEventTime values
in the properties map of a JobDataMap
- * provided returns the updated JobDataMap to the user
+ * Adds the cronExpression, reminderTimestamp, originalEventTime values in
the properties map of a new jobDataMap
+ * cloned from the one provided and returns the new JobDataMap to the user.
+ * `jobDataMap` and its `GobblinServiceJobScheduler.PROPERTIES_KEY` field
are shallow, not deep-copied
* @param jobDataMap
* @param leasedToAnotherStatus
* @param schedulerMaxBackoffMillis
* @return
*/
@VisibleForTesting
- public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
+ public static JobDataMap cloneAndUpdateJobProperties(JobDataMap jobDataMap,
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus, int
schedulerMaxBackoffMillis) {
- Properties prevJobProps = (Properties)
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+ JobDataMap newJobDataMap = (JobDataMap) jobDataMap.clone();
+ Properties newJobProperties =
+ (Properties) ((Properties)
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).clone();
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
long delayPeriodMillis =
leasedToAnotherStatus.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis);
String cronExpression = createCronFromDelayPeriod(delayPeriodMillis);
- prevJobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
cronExpression);
+ newJobProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
// Saves the following properties in jobProps to retrieve when the trigger
fires
-
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
+
newJobProperties.put(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
// Use the db consensus timestamp for the reminder to ensure inter-host
agreement. Participant trigger timestamps
// can differ between participants and be interpreted as a reminder for a
distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
-
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
+
newJobProperties.put(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Use this boolean to indicate whether this is a reminder event
- prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
String.valueOf(true));
- // Update job data map and reset it in jobDetail
- jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
- return jobDataMap;
+ newJobProperties.put(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
String.valueOf(true));
+ // Replace reference to old Properties map with new cloned Properties
+ newJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
newJobProperties);
+ return newJobDataMap;
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7189613d9..33274e425 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -249,16 +249,14 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Override
public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean adoptConsensusFlowExecutionId) throws IOException {
- log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
- dagActionLeaseObject.getDagAction(), dagActionLeaseObject.isReminder()
? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
+ log.info("Multi-active arbiter about to handle trigger event: {}",
dagActionLeaseObject);
// Query lease arbiter table about this dag action
Optional<GetEventInfoResult> getResult =
getExistingEventInfo(dagActionLeaseObject);
try {
if (!getResult.isPresent()) {
- log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE
1: no existing row for this dag action,"
- + " then go ahead and insert", dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder() ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis());
+ log.debug("tryAcquireLease for {} - CASE 1: no existing row for this
dag action, then go ahead and insert",
+ dagActionLeaseObject);
int numRowsUpdated =
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random()
* DELAY_FOR_RETRY_RANGE_MILLIS)
@@ -280,33 +278,29 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
if (dagActionLeaseObject.isReminder()) {
if (dagActionLeaseObject.getEventTimeMillis() <
dbEventTimestamp.getTime()) {
- log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - A new event trigger "
- + "is being worked on, so this older reminder will be
dropped.", dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new
event trigger "
+ + "is being worked on, so this older reminder will be
dropped.", dagActionLeaseObject,
dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
if (dagActionLeaseObject.getEventTimeMillis() >
dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
- log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Severe constraint "
+ log.warn("tryAcquireLease for {} - dbEventTimeMillis: {} - Severe
constraint "
+ "violation encountered: a reminder event newer than db
event was found when db laundering should "
- + "ensure monotonically increasing laundered event times.",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ + "ensure monotonically increasing laundered event times.",
dagActionLeaseObject,
dbEventTimestamp.getTime());
}
if (dagActionLeaseObject.getEventTimeMillis() ==
dbEventTimestamp.getTime()) {
- log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time "
- + "is the same as db event.",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
- dbEventTimestamp);
+ log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - Reminder
event time "
+ + "is the same as db event.", dagActionLeaseObject,
dbEventTimestamp);
}
}
- log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
- + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
- dbCurrentTimestamp.getTime());
-
+ // TODO: check whether reminder event before replacing flowExecutionId
+ if (adoptConsensusFlowExecutionId) {
+ log.info("Multi-active arbiter replacing local trigger event timestamp
{} with database eventTimestamp {} (in "
+ + "epoch-millis)", dagActionLeaseObject,
dbCurrentTimestamp.getTime());
+ }
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to
determine whether we should use the db
laundered event timestamp as the flowExecutionId or maintain the
original one
*/
@@ -444,7 +438,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
} catch (InterruptedException e2) {
throw new IOException(e2);
}
- throw e;
+ throw e;
}
catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 1afc77ba4..88361e563 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.util.Properties;
import org.junit.Assert;
+import org.mockito.Mockito;
import org.quartz.JobDataMap;
import org.testng.annotations.Test;
@@ -56,7 +57,9 @@ public class FlowLaunchHandlerTest {
/**
* Provides an input with all three values (cronExpression,
reminderTimestamp, originalEventTime) set in the map
- * Properties and checks that they are updated properly
+ * Properties and checks that they are updated properly in new jobDataMap's
Properties object. It checks that the
+ * JobDataMap returned along with the Properties object it contains do not
reference the same
+ * original objects.
*/
@Test
public void testUpdatePropsInJobDataMap() {
@@ -66,9 +69,10 @@ public class FlowLaunchHandlerTest {
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
"0");
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
"1");
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
+ JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);
- JobDataMap newJobDataMap =
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
- schedulerBackOffMillis);
+ JobDataMap newJobDataMap =
+ FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap,
leasedToAnotherStatus, schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
@@ -76,8 +80,17 @@ public class FlowLaunchHandlerTest {
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+ Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+ Assert.assertNotSame(originalProperties, newProperties);
+
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
+ // Verify that only clone() and get() methods are called on the
oldJobDataMap
+ Mockito.verify(spyOldJobDataMap).clone();
+ Mockito.verify(spyOldJobDataMap).get(Mockito.any());
+ Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
}
+
/**
* Provides input with an empty Properties object and checks that the three
values in question are set.
*/
@@ -86,15 +99,27 @@ public class FlowLaunchHandlerTest {
JobDataMap oldJobDataMap = new JobDataMap();
Properties originalProperties = new Properties();
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
+ JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);
- JobDataMap newJobDataMap =
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
+ JobDataMap newJobDataMap =
FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap,
leasedToAnotherStatus,
schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+
+ Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+ Assert.assertNotSame(originalProperties, newProperties);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+ Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+ Assert.assertNotSame(originalProperties, newProperties);
+
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
+ // Verify that only clone() and get() methods are called on the
oldJobDataMap
+ Mockito.verify(spyOldJobDataMap).clone();
+ Mockito.verify(spyOldJobDataMap).get(Mockito.any());
+ Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
}
/**