umustafi commented on code in PR #3743:
URL: https://github.com/apache/gobblin/pull/3743#discussion_r1298939227
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties
jobProps, MultiActiveLeaseArbit
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis));
- jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
- // in addition to the event we want to initiate
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
- String.valueOf(status.getEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(originalEventTimeMillis));
- JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
- Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+ JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
+ // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
+ // allow us to keep track of additional properties for reminder events. By
reusing the same job key, we either
+ // encounter an exception that the job already exists and cannot add it to
the scheduler or have to overwrite the
+ // original job properties with the reminder event schedule. Thus, we
differentiate the job and trigger key from the
+ // original event.
+ JobKey newJobKey = new JobKey(origJobKey.getName() +
createSuffixForJobTrigger(status.getEventTimeMillis()),
+ origJobKey.getGroup());
try {
- log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} in {} millis",
+ if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
+ log.warn("Attempting to set a reminder for a job that does not exist
in the scheduler. Key: {}", origJobKey);
Review Comment:
fixed both in 3746
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]