This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e3108ddf8 [GOBBLIN-2097] Use unique JobDataMaps and Properties to use 
for reminder events (#3984)
e3108ddf8 is described below

commit e3108ddf8ea11c015fb099d8da586f4ebfdc7557
Author: umustafi <[email protected]>
AuthorDate: Tue Jun 25 14:24:26 2024 -0700

    [GOBBLIN-2097] Use unique JobDataMaps and Properties to use for reminder 
events (#3984)
    
    * Use unique JobDataMaps and Properties to use for reminder events
---
 .../modules/orchestration/FlowLaunchHandler.java   | 50 ++++++++++++----------
 .../MysqlMultiActiveLeaseArbiter.java              | 36 +++++++---------
 .../orchestration/FlowLaunchHandlerTest.java       | 33 ++++++++++++--
 3 files changed, 71 insertions(+), 48 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 967d89a89..e548f16ad 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -152,8 +152,8 @@ public class FlowLaunchHandler {
   }
 
   /**
-   * This method is used by {@link 
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to 
check on
-   * the other participant's progress to finish acting on a dag action after 
the time the lease should expire.
+   * This method is used by {@link 
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to 
check
+   * on the other participant's progress to finish acting on a dag action 
after the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for (stored in 
`consensusDagAction`) and the minimum time after
    *               which reminder should occur
@@ -197,7 +197,8 @@ public class FlowLaunchHandler {
     // refer to the same set of jobProperties)
     String reminderSuffix = createSuffixForJobTrigger(status);
     JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix, 
origJobKey.getGroup());
-    JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, 
reminderJobKey, status);
+    JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, 
status);
+    jobDetail.setKey(reminderJobKey);
     Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, 
getJobPropertiesFromJobDetail(jobDetail),
         Optional.of(reminderSuffix));
     log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] -  attempting to 
schedule reminder for event {} with "
@@ -223,20 +224,20 @@ public class FlowLaunchHandler {
    * the event to revisit. It will update the jobKey to the reminderKey 
provides and the Properties map to
    * contain the cron scheduler for the reminder event and information about 
the event to revisit
    * @param originalKey
-   * @param reminderKey
    * @param status
    * @return
    * @throws SchedulerException
    */
-  protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, 
JobKey reminderKey,
-      LeaseAttemptStatus.LeasedToAnotherStatus status)
+  protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, 
LeaseAttemptStatus.LeasedToAnotherStatus status)
       throws SchedulerException {
-    JobDetailImpl jobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(originalKey);
-    jobDetail.setKey(reminderKey);
-    JobDataMap jobDataMap = jobDetail.getJobDataMap();
-    jobDataMap = updatePropsInJobDataMap(jobDataMap, status, 
schedulerMaxBackoffMillis);
-    jobDetail.setJobDataMap(jobDataMap);
-    return jobDetail;
+    // 1. shallow `.clone()` this top-level `JobDetailImpl`
+    JobDetailImpl clonedJobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(originalKey).clone();
+    JobDataMap originalJobDataMap = clonedJobDetail.getJobDataMap();
+    // 2. create a fresh `JobDataMap` specific to the reminder
+    JobDataMap newJobDataMap = cloneAndUpdateJobProperties(originalJobDataMap, 
status, schedulerMaxBackoffMillis);
+    // 3. update `clonedJobDetail` to point to the new `JobDataMap`
+    clonedJobDetail.setJobDataMap(newJobDataMap);
+    return clonedJobDetail;
   }
 
   public static Properties getJobPropertiesFromJobDetail(JobDetail jobDetail) {
@@ -244,35 +245,38 @@ public class FlowLaunchHandler {
   }
 
   /**
-   * Updates the cronExpression, reminderTimestamp, originalEventTime values 
in the properties map of a JobDataMap
-   * provided returns the updated JobDataMap to the user
+   * Adds the cronExpression, reminderTimestamp, originalEventTime values in 
the properties map of a new jobDataMap
+   * cloned from the one provided and returns the new JobDataMap to the user.
+   * `jobDataMap` and its `GobblinServiceJobScheduler.PROPERTIES_KEY` field 
are shallow, not deep-copied
    * @param jobDataMap
    * @param leasedToAnotherStatus
    * @param schedulerMaxBackoffMillis
    * @return
    */
   @VisibleForTesting
-  public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
+  public static JobDataMap cloneAndUpdateJobProperties(JobDataMap jobDataMap,
       LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus, int 
schedulerMaxBackoffMillis) {
-    Properties prevJobProps = (Properties) 
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+    JobDataMap newJobDataMap = (JobDataMap) jobDataMap.clone();
+    Properties newJobProperties =
+        (Properties) ((Properties) 
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).clone();
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     long delayPeriodMillis = 
leasedToAnotherStatus.getMinimumLingerDurationMillis()
         + random.nextInt(schedulerMaxBackoffMillis);
     String cronExpression = createCronFromDelayPeriod(delayPeriodMillis);
-    prevJobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, 
cronExpression);
+    newJobProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
     // Saves the following properties in jobProps to retrieve when the trigger 
fires
-    
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
+    
newJobProperties.put(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
         String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
     // Use the db consensus timestamp for the reminder to ensure inter-host 
agreement. Participant trigger timestamps
     // can differ between participants and be interpreted as a reminder for a 
distinct flow trigger which will cause
     // excess flows to be triggered by the reminder functionality.
-    
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
+    
newJobProperties.put(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
         String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
     // Use this boolean to indicate whether this is a reminder event
-    prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
String.valueOf(true));
-    // Update job data map and reset it in jobDetail
-    jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
-    return jobDataMap;
+    newJobProperties.put(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
String.valueOf(true));
+    // Replace reference to old Properties map with new cloned Properties
+    newJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
newJobProperties);
+    return newJobDataMap;
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7189613d9..33274e425 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -249,16 +249,14 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
   @Override
   public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, 
boolean adoptConsensusFlowExecutionId) throws IOException {
-    log.info("Multi-active scheduler about to handle trigger event: [{}, is: 
{}, triggerEventTimestamp: {}]",
-       dagActionLeaseObject.getDagAction(), dagActionLeaseObject.isReminder() 
? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
+    log.info("Multi-active arbiter about to handle trigger event: {}", 
dagActionLeaseObject);
     // Query lease arbiter table about this dag action
     Optional<GetEventInfoResult> getResult = 
getExistingEventInfo(dagActionLeaseObject);
 
     try {
       if (!getResult.isPresent()) {
-        log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 
1: no existing row for this dag action,"
-            + " then go ahead and insert", dagActionLeaseObject.getDagAction(),
-            dagActionLeaseObject.isReminder() ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis());
+        log.debug("tryAcquireLease for {} - CASE 1: no existing row for this 
dag action, then go ahead and insert",
+            dagActionLeaseObject);
         int numRowsUpdated = 
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
             ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
                 .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() 
* DELAY_FOR_RETRY_RANGE_MILLIS)
@@ -280,33 +278,29 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
       if (dagActionLeaseObject.isReminder()) {
         if (dagActionLeaseObject.getEventTimeMillis() < 
dbEventTimestamp.getTime()) {
-          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - A new event trigger "
-                  + "is being worked on, so this older reminder will be 
dropped.", dagActionLeaseObject.getDagAction(),
-             dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+          log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new 
event trigger "
+                  + "is being worked on, so this older reminder will be 
dropped.", dagActionLeaseObject,
               dbEventTimestamp);
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
         if (dagActionLeaseObject.getEventTimeMillis() > 
dbEventTimestamp.getTime()) {
           // TODO: emit metric here to capture this unexpected behavior
-          log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Severe constraint "
+          log.warn("tryAcquireLease for {} - dbEventTimeMillis: {} - Severe 
constraint "
                   + "violation encountered: a reminder event newer than db 
event was found when db laundering should "
-                  + "ensure monotonically increasing laundered event times.", 
dagActionLeaseObject.getDagAction(),
-             dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+                  + "ensure monotonically increasing laundered event times.", 
dagActionLeaseObject,
               dbEventTimestamp.getTime());
         }
         if (dagActionLeaseObject.getEventTimeMillis() == 
dbEventTimestamp.getTime()) {
-          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Reminder event time "
-                  + "is the same as db event.", 
dagActionLeaseObject.getDagAction(),
-              dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
-              dbEventTimestamp);
+          log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - Reminder 
event time "
+                  + "is the same as db event.", dagActionLeaseObject, 
dbEventTimestamp);
         }
       }
 
-      log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, is: {}, triggerEventTimestamp: {}] "
-          + "with database eventTimestamp {} (in epoch-millis)", 
dagActionLeaseObject.getDagAction(),
-          dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
-          dbCurrentTimestamp.getTime());
-
+      // TODO: check whether reminder event before replacing flowExecutionId
+      if (adoptConsensusFlowExecutionId) {
+        log.info("Multi-active arbiter replacing local trigger event timestamp 
{} with database eventTimestamp {} (in "
+                + "epoch-millis)", dagActionLeaseObject, 
dbCurrentTimestamp.getTime());
+      }
       /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to 
determine whether we should use the db
       laundered event timestamp as the flowExecutionId or maintain the 
original one
        */
@@ -444,7 +438,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
             } catch (InterruptedException e2) {
               throw new IOException(e2);
             }
-            throw e; 
+            throw e;
           }
           catch (SQLIntegrityConstraintViolationException e) {
             if (!e.getMessage().contains("Duplicate entry")) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 1afc77ba4..88361e563 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.util.Properties;
 
 import org.junit.Assert;
+import org.mockito.Mockito;
 import org.quartz.JobDataMap;
 import org.testng.annotations.Test;
 
@@ -56,7 +57,9 @@ public class FlowLaunchHandlerTest {
 
   /**
    * Provides an input with all three values (cronExpression, 
reminderTimestamp, originalEventTime) set in the map
-   * Properties and checks that they are updated properly
+   * Properties and checks that they are updated properly in new jobDataMap's 
Properties object. It checks that the
+   * JobDataMap returned along with the Properties object it contains do not 
reference the same
+   * original objects.
    */
   @Test
   public void testUpdatePropsInJobDataMap() {
@@ -66,9 +69,10 @@ public class FlowLaunchHandlerTest {
     
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
 "0");
     
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
 "1");
     oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
+    JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);
 
-    JobDataMap newJobDataMap = 
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
-        schedulerBackOffMillis);
+    JobDataMap newJobDataMap =
+        FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap, 
leasedToAnotherStatus, schedulerBackOffMillis);
     Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
     
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
     Assert.assertNotEquals("0",
@@ -76,8 +80,17 @@ public class FlowLaunchHandlerTest {
     
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
         
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
     
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+    Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+    Assert.assertNotSame(originalProperties, newProperties);
+    
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
+    // Verify that only clone() and get() methods are called on the 
oldJobDataMap
+    Mockito.verify(spyOldJobDataMap).clone();
+    Mockito.verify(spyOldJobDataMap).get(Mockito.any());
+    Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
   }
 
+
   /**
    * Provides input with an empty Properties object and checks that the three 
values in question are set.
    */
@@ -86,15 +99,27 @@ public class FlowLaunchHandlerTest {
     JobDataMap oldJobDataMap = new JobDataMap();
     Properties originalProperties = new Properties();
     oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
+    JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);
 
-    JobDataMap newJobDataMap = 
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
+    JobDataMap newJobDataMap = 
FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap, 
leasedToAnotherStatus,
         schedulerBackOffMillis);
     Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+
+    Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+    Assert.assertNotSame(originalProperties, newProperties);
     
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
     
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
     
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
         
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
     
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+    Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+    Assert.assertNotSame(originalProperties, newProperties);
+    
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
+    // Verify that only clone() and get() methods are called on the 
oldJobDataMap
+    Mockito.verify(spyOldJobDataMap).clone();
+    Mockito.verify(spyOldJobDataMap).get(Mockito.any());
+    Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
   }
 
   /**

Reply via email to