This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 028b85f58 [GOBBLIN-1920] Use db laundered timestamp for reminder event
(#3788)
028b85f58 is described below
commit 028b85f587e3c1e6afa5d8662fe9ed3f0087568d
Author: umustafi <[email protected]>
AuthorDate: Mon Sep 25 16:06:56 2023 -0700
[GOBBLIN-1920] Use db laundered timestamp for reminder event (#3788)
* Use db laundered timestamp for reminder event
* Add more details to comment and remove extra param
* remove unused param
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/orchestration/FlowTriggerHandler.java | 17 ++++++++---------
.../modules/orchestration/FlowTriggerHandlerTest.java | 8 ++++----
2 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 8de081e77..1849619f0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -207,8 +207,7 @@ public class FlowTriggerHandler {
// refer to the same set of jobProperties)
String reminderSuffix = createSuffixForJobTrigger(status);
JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix,
origJobKey.getGroup());
- JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey,
reminderJobKey, status,
- triggerEventTimeMillis);
+ JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey,
reminderJobKey, status);
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey,
getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} with "
@@ -236,17 +235,16 @@ public class FlowTriggerHandler {
* @param originalKey
* @param reminderKey
* @param status
- * @param triggerEventTimeMillis
* @return
* @throws SchedulerException
*/
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- MultiActiveLeaseArbiter.LeasedToAnotherStatus status, long
triggerEventTimeMillis)
+ MultiActiveLeaseArbiter.LeasedToAnotherStatus status)
throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
jobDetail.setKey(reminderKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
triggerEventTimeMillis, schedulerMaxBackoffMillis);
+ jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
jobDetail.setJobDataMap(jobDataMap);
return jobDetail;
}
@@ -260,14 +258,12 @@ public class FlowTriggerHandler {
* provided returns the updated JobDataMap to the user
* @param jobDataMap
* @param leasedToAnotherStatus
- * @param triggerEventTimeMillis
* @param schedulerMaxBackoffMillis
* @return
*/
@VisibleForTesting
public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
- MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus,
long triggerEventTimeMillis,
- int schedulerMaxBackoffMillis) {
+ MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, int
schedulerMaxBackoffMillis) {
Properties prevJobProps = (Properties)
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
long delayPeriodMillis =
leasedToAnotherStatus.getMinimumLingerDurationMillis()
@@ -277,8 +273,11 @@ public class FlowTriggerHandler {
// Saves the following properties in jobProps to retrieve when the trigger
fires
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
+ // Use the db laundered timestamp for the reminder to ensure consensus
between hosts. Participant trigger timestamps
+ // can differ between participants and be interpreted as a reminder for a
distinct flow trigger which will cause
+ // excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
- String.valueOf(triggerEventTimeMillis));
+ String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
index 70a4a134f..55a95efcd 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
@@ -66,12 +66,12 @@ public class FlowTriggerHandlerTest {
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
JobDataMap newJobDataMap =
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
- eventToTrigger, schedulerBackOffMillis);
+ schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
- Assert.assertEquals(String.valueOf(eventToTrigger),
+
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
}
@@ -85,11 +85,11 @@ public class FlowTriggerHandlerTest {
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
JobDataMap newJobDataMap =
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
- eventToTrigger, schedulerBackOffMillis);
+ schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
- Assert.assertEquals(String.valueOf(eventToTrigger),
+
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
}