This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b109d5ee [GOBBLIN-2016] Add eventTime to DagAction reminder key 
(#3995)
6b109d5ee is described below

commit 6b109d5ee068ab781d3e94c6ff1b1a0b16be6377
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 9 14:28:33 2024 -0700

    [GOBBLIN-2016] Add eventTime to DagAction reminder key (#3995)
    
    * Add eventTime to DagAction reminder key
---
 ...gManagementDagActionStoreChangeMonitorTest.java |  2 ++
 .../orchestration/DagActionReminderScheduler.java  | 42 ++++++++++++++--------
 .../DagManagementDagActionStoreChangeMonitor.java  |  2 ++
 .../DagActionReminderSchedulerTest.java            | 41 +++++++++++++++++----
 4 files changed, 66 insertions(+), 21 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 86a134306..8d2b6bab9 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -112,10 +112,12 @@ public class DagManagementDagActionStoreChangeMonitorTest 
{
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
     
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    /* TODO: skip deadline removal for now and let them fire
     
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
 times(1))
         .unscheduleReminderJob(eq(dagAction), eq(true));
     
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
 times(1))
         .unscheduleReminderJob(eq(dagAction), eq(false));
+     */
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 8c1d3a8af..ac856f32d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -80,16 +80,16 @@ public class DagActionReminderScheduler {
       boolean isDeadlineReminder)
       throws SchedulerException {
     JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
-    Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(), 
reminderDurationMillis,
+    Trigger trigger = createReminderJobTrigger(leaseParams, 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
     log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
         leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
-  public void unscheduleReminderJob(DagActionStore.DagAction dagAction, 
boolean isDeadlineTrigger) throws SchedulerException {
-    log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}", 
dagAction, isDeadlineTrigger);
-    quartzScheduler.deleteJob(createJobKey(dagAction, isDeadlineTrigger));
+  public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, 
boolean isDeadlineTrigger) throws SchedulerException {
+    log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}", 
leaseParams, isDeadlineTrigger);
+    quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger));
   }
 
   /**
@@ -128,23 +128,35 @@ public class DagActionReminderScheduler {
   }
 
   /**
-   * Creates a key for the reminder job by concatenating all dagAction fields
+   * Creates a key for the reminder job by concatenating all dagAction fields 
and the eventTime of the dagAction.
+   *
+   * This ensures unique keys for multiple instances of the same action on the 
same flow execution that originate more
+   * than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime 
to distinguish these distinct occurrences
+   * of the same action. This is necessary to prevent insertion failures due 
to previous reminders.
+   *
+   * Applicable only for KILL and RESUME actions; duplication for other 
actions is an error.
    */
-  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
-    return String.format("%s.%s.%s.%s.%s", dagAction.getFlowGroup(), 
dagAction.getFlowName(),
-        dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType());
+  public static String createDagActionReminderKey(DagActionStore.LeaseParams 
leaseParams) {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
+    return String.join(".",
+        dagAction.getFlowGroup(),
+        dagAction.getFlowName(),
+        String.valueOf(dagAction.getFlowExecutionId()),
+        dagAction.getJobName(),
+        String.valueOf(dagAction.getDagActionType()),
+        String.valueOf(leaseParams.getEventTimeMillis()));
   }
 
   /**
    * Creates a JobKey object for the reminder job where the name is the 
DagActionReminderKey from above and the group is
    * the flowGroup
    */
-  public static JobKey createJobKey(DagActionStore.DagAction dagAction, 
boolean isDeadlineReminder) {
-    return new JobKey(createDagActionReminderKey(dagAction), 
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
+  public static JobKey createJobKey(DagActionStore.LeaseParams leaseParams, 
boolean isDeadlineReminder) {
+    return new JobKey(createDagActionReminderKey(leaseParams), 
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
   }
 
-  private static TriggerKey createTriggerKey(DagActionStore.DagAction 
dagAction, boolean isDeadlineReminder) {
-    return new TriggerKey(createDagActionReminderKey(dagAction), 
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
+  private static TriggerKey createTriggerKey(DagActionStore.LeaseParams 
leaseParams, boolean isDeadlineReminder) {
+    return new TriggerKey(createDagActionReminderKey(leaseParams), 
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
   }
 
   /**
@@ -163,7 +175,7 @@ public class DagActionReminderScheduler {
     dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, 
leaseParams.getEventTimeMillis());
 
     return JobBuilder.newJob(ReminderJob.class)
-        .withIdentity(createJobKey(leaseParams.getDagAction(), 
isDeadlineReminder))
+        .withIdentity(createJobKey(leaseParams, isDeadlineReminder))
         .usingJobData(dataMap)
         .build();
   }
@@ -173,10 +185,10 @@ public class DagActionReminderScheduler {
    * with a job at any given time) that should fire after 
`reminderDurationMillis` millis. It uses
    * `getCurrentTimeMillis` to determine the current time.
    */
-  public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis,
+  public static Trigger createReminderJobTrigger(DagActionStore.LeaseParams 
leaseParams, long reminderDurationMillis,
       Supplier<Long> getCurrentTimeMillis, boolean isDeadlineReminder) {
     return TriggerBuilder.newTrigger()
-        .withIdentity(createTriggerKey(dagAction, isDeadlineReminder))
+        .withIdentity(createTriggerKey(leaseParams, isDeadlineReminder))
         .startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
         .build();
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index a9862307e..0f2ead5d4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -73,12 +73,14 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
           break;
         case "DELETE":
           log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+          /* TODO: skip deadline removal for now and let them fire
           if (dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
               || dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
             this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, 
true);
             // clear any deadline reminders as well as any retry reminders
             this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, 
false);
           }
+           */
           break;
         default:
           log.warn(
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 4754a5e2c..817e2cef3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -23,10 +23,13 @@ import java.util.function.Supplier;
 
 import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 import org.quartz.TriggerUtils;
+import org.quartz.impl.StdSchedulerFactory;
 import org.quartz.spi.OperableTrigger;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
@@ -39,14 +42,29 @@ public class DagActionReminderSchedulerTest {
   String flowName = "fn";
   long flowExecutionId = 123L;
   String jobName = "jn";
+  long eventTimeMillis = 1234L;
+  long eventTimeMillis2 = 5678L;
   String expectedKey =  Joiner.on(".").join(flowGroup, flowName, 
flowExecutionId, jobName,
-      DagActionStore.DagActionType.LAUNCH.name());
+      DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis);
+  String expectedKey2 =  Joiner.on(".").join(flowGroup, flowName, 
flowExecutionId, jobName,
+      DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis2);
   DagActionStore.DagAction launchDagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
       DagActionStore.DagActionType.LAUNCH);
+  DagActionStore.LeaseParams launchLeaseParams = new 
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
+  DagActionStore.LeaseParams launchLeaseParams2 = new 
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
+  DagActionReminderScheduler dagActionReminderScheduler;
+
+  @BeforeClass
+  private void setup() throws SchedulerException {
+    StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+    schedulerFactory.getScheduler();
+    this.dagActionReminderScheduler = new 
DagActionReminderScheduler(schedulerFactory);
+  }
 
   @Test
   public void testCreateDagActionReminderKey() {
-    Assert.assertEquals(expectedKey, 
DagActionReminderScheduler.createDagActionReminderKey(launchDagAction));
+    Assert.assertEquals(expectedKey, 
DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams));
+    Assert.assertEquals(expectedKey2, 
DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2));
   }
 
   @Test
@@ -54,7 +72,7 @@ public class DagActionReminderSchedulerTest {
     long reminderDuration = 666L;
     Supplier<Long> getCurrentTimeMillis = () -> 12345600000L;
     Trigger reminderTrigger = DagActionReminderScheduler
-        .createReminderJobTrigger(launchDagAction, reminderDuration, 
getCurrentTimeMillis, false);
+        .createReminderJobTrigger(launchLeaseParams, reminderDuration, 
getCurrentTimeMillis, false);
     Assert.assertEquals(reminderTrigger.getKey().toString(), 
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
     List<Date> fireTimes = TriggerUtils.computeFireTimes((OperableTrigger) 
reminderTrigger, null, 1);
     Assert.assertEquals(fireTimes.get(0), new Date(reminderDuration + 
getCurrentTimeMillis.get()));
@@ -62,8 +80,7 @@ public class DagActionReminderSchedulerTest {
 
   @Test
   public void testCreateReminderJobDetail() {
-    long expectedEventTimeMillis = 55L;
-    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(new 
DagActionStore.LeaseParams(launchDagAction, false, expectedEventTimeMillis), 
false);
+    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
     Assert.assertEquals(jobDetail.getKey().toString(), 
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
     JobDataMap dataMap = jobDetail.getJobDataMap();
     Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
@@ -72,6 +89,18 @@ public class DagActionReminderSchedulerTest {
     Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
     
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
         DagActionStore.DagActionType.LAUNCH);
-    
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
 expectedEventTimeMillis);
+    
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
 launchLeaseParams.getEventTimeMillis());
+  }
+
+  /*
+  Add deadline reminders for multiple launches of the same flow and assert no 
exception is thrown and they can be
+  deleted as well.
+   */
+  @Test
+  public void testRemindersForMultipleFlowExecutions() throws 
SchedulerException {
+    this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, 
true);
+    this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 
50000, true);
+    this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, 
true);
+    this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, 
true);
   }
 }

Reply via email to