This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 028b85f58 [GOBBLIN-1920] Use db laundered timestamp for reminder event 
(#3788)
028b85f58 is described below

commit 028b85f587e3c1e6afa5d8662fe9ed3f0087568d
Author: umustafi <[email protected]>
AuthorDate: Mon Sep 25 16:06:56 2023 -0700

    [GOBBLIN-1920] Use db laundered timestamp for reminder event (#3788)
    
    * Use db laundered timestamp for reminder event
    
    * Add more details to comment and remove extra param
    
    * remove unused param
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/orchestration/FlowTriggerHandler.java       | 17 ++++++++---------
 .../modules/orchestration/FlowTriggerHandlerTest.java   |  8 ++++----
 2 files changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 8de081e77..1849619f0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -207,8 +207,7 @@ public class FlowTriggerHandler {
     // refer to the same set of jobProperties)
     String reminderSuffix = createSuffixForJobTrigger(status);
     JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix, 
origJobKey.getGroup());
-    JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, 
reminderJobKey, status,
-        triggerEventTimeMillis);
+    JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, 
reminderJobKey, status);
     Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, 
getJobPropertiesFromJobDetail(jobDetail),
         Optional.of(reminderSuffix));
     log.debug("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {} with "
@@ -236,17 +235,16 @@ public class FlowTriggerHandler {
    * @param originalKey
    * @param reminderKey
    * @param status
-   * @param triggerEventTimeMillis
    * @return
    * @throws SchedulerException
    */
   protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, 
JobKey reminderKey,
-      MultiActiveLeaseArbiter.LeasedToAnotherStatus status, long 
triggerEventTimeMillis)
+      MultiActiveLeaseArbiter.LeasedToAnotherStatus status)
       throws SchedulerException {
     JobDetailImpl jobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(originalKey);
     jobDetail.setKey(reminderKey);
     JobDataMap jobDataMap = jobDetail.getJobDataMap();
-    jobDataMap = updatePropsInJobDataMap(jobDataMap, status, 
triggerEventTimeMillis, schedulerMaxBackoffMillis);
+    jobDataMap = updatePropsInJobDataMap(jobDataMap, status, 
schedulerMaxBackoffMillis);
     jobDetail.setJobDataMap(jobDataMap);
     return jobDetail;
   }
@@ -260,14 +258,12 @@ public class FlowTriggerHandler {
    * provided returns the updated JobDataMap to the user
    * @param jobDataMap
    * @param leasedToAnotherStatus
-   * @param triggerEventTimeMillis
    * @param schedulerMaxBackoffMillis
    * @return
    */
   @VisibleForTesting
   public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
-      MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, 
long triggerEventTimeMillis,
-      int schedulerMaxBackoffMillis) {
+      MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, int 
schedulerMaxBackoffMillis) {
     Properties prevJobProps = (Properties) 
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     long delayPeriodMillis = 
leasedToAnotherStatus.getMinimumLingerDurationMillis()
@@ -277,8 +273,11 @@ public class FlowTriggerHandler {
     // Saves the following properties in jobProps to retrieve when the trigger 
fires
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
         String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
+    // Use the db laundered timestamp for the reminder to ensure consensus 
between hosts. Participant trigger timestamps
+    // can differ between participants and be interpreted as a reminder for a 
distinct flow trigger which will cause
+    // excess flows to be triggered by the reminder functionality.
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
-        String.valueOf(triggerEventTimeMillis));
+        String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
     // Update job data map and reset it in jobDetail
     jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
     return jobDataMap;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
index 70a4a134f..55a95efcd 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
@@ -66,12 +66,12 @@ public class FlowTriggerHandlerTest {
     oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
 
     JobDataMap newJobDataMap = 
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
-        eventToTrigger, schedulerBackOffMillis);
+        schedulerBackOffMillis);
     Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
     
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
     Assert.assertNotEquals("0",
         
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
-    Assert.assertEquals(String.valueOf(eventToTrigger),
+    
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
         
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
   }
 
@@ -85,11 +85,11 @@ public class FlowTriggerHandlerTest {
     oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
 
     JobDataMap newJobDataMap = 
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
-        eventToTrigger, schedulerBackOffMillis);
+        schedulerBackOffMillis);
     Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
     
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
     
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
-    Assert.assertEquals(String.valueOf(eventToTrigger),
+    
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
         
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
   }
 

Reply via email to