This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6b109d5ee [GOBBLIN-2016] Add eventTime to DagAction reminder key
(#3995)
6b109d5ee is described below
commit 6b109d5ee068ab781d3e94c6ff1b1a0b16be6377
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 9 14:28:33 2024 -0700
[GOBBLIN-2016] Add eventTime to DagAction reminder key (#3995)
* Add eventTime to DagAction reminder key
---
...gManagementDagActionStoreChangeMonitorTest.java | 2 ++
.../orchestration/DagActionReminderScheduler.java | 42 ++++++++++++++--------
.../DagManagementDagActionStoreChangeMonitor.java | 2 ++
.../DagActionReminderSchedulerTest.java | 41 +++++++++++++++++----
4 files changed, 66 insertions(+), 21 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 86a134306..8d2b6bab9 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -112,10 +112,12 @@ public class DagManagementDagActionStoreChangeMonitorTest
{
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+ /* TODO: skip deadline removal for now and let them fire
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
times(1))
.unscheduleReminderJob(eq(dagAction), eq(true));
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
times(1))
.unscheduleReminderJob(eq(dagAction), eq(false));
+ */
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 8c1d3a8af..ac856f32d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -80,16 +80,16 @@ public class DagActionReminderScheduler {
boolean isDeadlineReminder)
throws SchedulerException {
JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
- Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(),
reminderDurationMillis,
+ Trigger trigger = createReminderJobTrigger(leaseParams,
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
- public void unscheduleReminderJob(DagActionStore.DagAction dagAction,
boolean isDeadlineTrigger) throws SchedulerException {
- log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}",
dagAction, isDeadlineTrigger);
- quartzScheduler.deleteJob(createJobKey(dagAction, isDeadlineTrigger));
+ public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams,
boolean isDeadlineTrigger) throws SchedulerException {
+ log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}",
leaseParams, isDeadlineTrigger);
+ quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger));
}
/**
@@ -128,23 +128,35 @@ public class DagActionReminderScheduler {
}
/**
- * Creates a key for the reminder job by concatenating all dagAction fields
+ * Creates a key for the reminder job by concatenating all dagAction fields
and the eventTime of the dagAction.
+ *
+ * This ensures unique keys for multiple instances of the same action on the
same flow execution that originate more
+ * than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime
to distinguish these distinct occurrences
+ * of the same action. This is necessary to prevent insertion failures due
to previous reminders.
+ *
+ * Applicable only for KILL and RESUME actions; duplication for other
actions is an error.
*/
- public static String createDagActionReminderKey(DagActionStore.DagAction
dagAction) {
- return String.format("%s.%s.%s.%s.%s", dagAction.getFlowGroup(),
dagAction.getFlowName(),
- dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType());
+ public static String createDagActionReminderKey(DagActionStore.LeaseParams
leaseParams) {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
+ return String.join(".",
+ dagAction.getFlowGroup(),
+ dagAction.getFlowName(),
+ String.valueOf(dagAction.getFlowExecutionId()),
+ dagAction.getJobName(),
+ String.valueOf(dagAction.getDagActionType()),
+ String.valueOf(leaseParams.getEventTimeMillis()));
}
/**
* Creates a JobKey object for the reminder job where the name is the
DagActionReminderKey from above and the group is
* the flowGroup
*/
- public static JobKey createJobKey(DagActionStore.DagAction dagAction,
boolean isDeadlineReminder) {
- return new JobKey(createDagActionReminderKey(dagAction),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
+ public static JobKey createJobKey(DagActionStore.LeaseParams leaseParams,
boolean isDeadlineReminder) {
+ return new JobKey(createDagActionReminderKey(leaseParams),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
}
- private static TriggerKey createTriggerKey(DagActionStore.DagAction
dagAction, boolean isDeadlineReminder) {
- return new TriggerKey(createDagActionReminderKey(dagAction),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
+ private static TriggerKey createTriggerKey(DagActionStore.LeaseParams
leaseParams, boolean isDeadlineReminder) {
+ return new TriggerKey(createDagActionReminderKey(leaseParams),
isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup);
}
/**
@@ -163,7 +175,7 @@ public class DagActionReminderScheduler {
dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY,
leaseParams.getEventTimeMillis());
return JobBuilder.newJob(ReminderJob.class)
- .withIdentity(createJobKey(leaseParams.getDagAction(),
isDeadlineReminder))
+ .withIdentity(createJobKey(leaseParams, isDeadlineReminder))
.usingJobData(dataMap)
.build();
}
@@ -173,10 +185,10 @@ public class DagActionReminderScheduler {
* with a job at any given time) that should fire after
`reminderDurationMillis` millis. It uses
* `getCurrentTimeMillis` to determine the current time.
*/
- public static Trigger createReminderJobTrigger(DagActionStore.DagAction
dagAction, long reminderDurationMillis,
+ public static Trigger createReminderJobTrigger(DagActionStore.LeaseParams
leaseParams, long reminderDurationMillis,
Supplier<Long> getCurrentTimeMillis, boolean isDeadlineReminder) {
return TriggerBuilder.newTrigger()
- .withIdentity(createTriggerKey(dagAction, isDeadlineReminder))
+ .withIdentity(createTriggerKey(leaseParams, isDeadlineReminder))
.startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
.build();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index a9862307e..0f2ead5d4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -73,12 +73,14 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
break;
case "DELETE":
log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+ /* TODO: skip deadline removal for now and let them fire
if (dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
|| dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction,
true);
// clear any deadline reminders as well as any retry reminders
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction,
false);
}
+ */
break;
default:
log.warn(
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 4754a5e2c..817e2cef3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -23,10 +23,13 @@ import java.util.function.Supplier;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerUtils;
+import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.OperableTrigger;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Joiner;
@@ -39,14 +42,29 @@ public class DagActionReminderSchedulerTest {
String flowName = "fn";
long flowExecutionId = 123L;
String jobName = "jn";
+ long eventTimeMillis = 1234L;
+ long eventTimeMillis2 = 5678L;
String expectedKey = Joiner.on(".").join(flowGroup, flowName,
flowExecutionId, jobName,
- DagActionStore.DagActionType.LAUNCH.name());
+ DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis);
+ String expectedKey2 = Joiner.on(".").join(flowGroup, flowName,
flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH.name(), eventTimeMillis2);
DagActionStore.DagAction launchDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams launchLeaseParams = new
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
+ DagActionStore.LeaseParams launchLeaseParams2 = new
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
+ DagActionReminderScheduler dagActionReminderScheduler;
+
+ @BeforeClass
+ private void setup() throws SchedulerException {
+ StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ schedulerFactory.getScheduler();
+ this.dagActionReminderScheduler = new
DagActionReminderScheduler(schedulerFactory);
+ }
@Test
public void testCreateDagActionReminderKey() {
- Assert.assertEquals(expectedKey,
DagActionReminderScheduler.createDagActionReminderKey(launchDagAction));
+ Assert.assertEquals(expectedKey,
DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams));
+ Assert.assertEquals(expectedKey2,
DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2));
}
@Test
@@ -54,7 +72,7 @@ public class DagActionReminderSchedulerTest {
long reminderDuration = 666L;
Supplier<Long> getCurrentTimeMillis = () -> 12345600000L;
Trigger reminderTrigger = DagActionReminderScheduler
- .createReminderJobTrigger(launchDagAction, reminderDuration,
getCurrentTimeMillis, false);
+ .createReminderJobTrigger(launchLeaseParams, reminderDuration,
getCurrentTimeMillis, false);
Assert.assertEquals(reminderTrigger.getKey().toString(),
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
List<Date> fireTimes = TriggerUtils.computeFireTimes((OperableTrigger)
reminderTrigger, null, 1);
Assert.assertEquals(fireTimes.get(0), new Date(reminderDuration +
getCurrentTimeMillis.get()));
@@ -62,8 +80,7 @@ public class DagActionReminderSchedulerTest {
@Test
public void testCreateReminderJobDetail() {
- long expectedEventTimeMillis = 55L;
- JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(new
DagActionStore.LeaseParams(launchDagAction, false, expectedEventTimeMillis),
false);
+ JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
Assert.assertEquals(jobDetail.getKey().toString(),
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
JobDataMap dataMap = jobDetail.getJobDataMap();
Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
@@ -72,6 +89,18 @@ public class DagActionReminderSchedulerTest {
Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
DagActionStore.DagActionType.LAUNCH);
-
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
expectedEventTimeMillis);
+
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
launchLeaseParams.getEventTimeMillis());
+ }
+
+ /*
+ Add deadline reminders for multiple launches of the same flow and assert no
exception is thrown and they can be
+ deleted as well.
+ */
+ @Test
+ public void testRemindersForMultipleFlowExecutions() throws
SchedulerException {
+ this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000,
true);
+ this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2,
50000, true);
+ this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams,
true);
+ this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2,
true);
}
}