phet commented on code in PR #3743:
URL: https://github.com/apache/gobblin/pull/3743#discussion_r1298904021
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties
jobProps, MultiActiveLeaseArbit
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis));
- jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
- // in addition to the event we want to initiate
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
- String.valueOf(status.getEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(originalEventTimeMillis));
- JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
- Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+ JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
+ // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
+ // allow us to keep track of additional properties for reminder events. By
reusing the same job key, we either
+ // encounter an exception that the job already exists and cannot add it to
the scheduler or have to overwrite the
+ // original job properties with the reminder event schedule. Thus, we
differentiate the job and trigger key from the
+ // original event.
+ JobKey newJobKey = new JobKey(origJobKey.getName() +
createSuffixForJobTrigger(status.getEventTimeMillis()),
+ origJobKey.getGroup());
Review Comment:
the best code is skimmable, and this is not, because there's too much to
visually parse. raise the level of abstraction. each method call should sing
out a single immediately recognizable concept.
e.g. if we encapsulate the reaching inside of this object and that, which is
so prevalent here, it would read:
```
JobKey triggerJobKey = JobKey.fromJobProps(jobProperties);
try {
if (!this.schedulerService.getScheduler().checkExists(triggerJobKey)) {
...
}
JobKey newJobKey = calcReminderJobKey(status, triggerJobKey);
// NOTES: change return type to avoid cast; callee accesses
this.schedulerMaxBackoffMillis
JobDetailImpl jobDetail = updatePropsInJobDetail(triggerJobKey, status,
originalEventTimeMillis);
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} in {} "
+ "millis", flowAction, originalEventTimeMillis,
status.getEventTimeMillis(), trigger.getNextFireTime());
// NOTES: new method that also encapsulates creating the `Trigger`; so
we can log `.getNextFireTime(), it returns that
Trigger reminderTrigger =
this.schedulerService.scheduleJob(reminderJobKey, jobDetail);
log.info(...)
}
```
up to you if you have the time RN... but this is what to aim for :)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties
jobProps, MultiActiveLeaseArbit
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis));
- jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
- // in addition to the event we want to initiate
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
- String.valueOf(status.getEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(originalEventTimeMillis));
- JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
- Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+ JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
+ // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
+ // allow us to keep track of additional properties for reminder events. By
reusing the same job key, we either
+ // encounter an exception that the job already exists and cannot add it to
the scheduler or have to overwrite the
+ // original job properties with the reminder event schedule. Thus, we
differentiate the job and trigger key from the
+ // original event.
+ JobKey newJobKey = new JobKey(origJobKey.getName() +
createSuffixForJobTrigger(status.getEventTimeMillis()),
+ origJobKey.getGroup());
try {
- log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} in {} millis",
+ if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
+ log.warn("Attempting to set a reminder for a job that does not exist
in the scheduler. Key: {}", origJobKey);
Review Comment:
it doesn't seem like we're actually "Attempting", but rather skipping. if
so, this logging is misleading.
also, let's not create `newJobKey` before confirming a need for it.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -497,6 +497,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction
action) {
FlowId flowId = action.getFlowId();
FlowSpec spec;
try {
+ log.info("Handle launch flow event for action: {}", action);
Review Comment:
isn't similar logging also proposed in
https://github.com/apache/gobblin/pull/3746 ?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties
jobProps, MultiActiveLeaseArbit
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis));
- jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
- // in addition to the event we want to initiate
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
- String.valueOf(status.getEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(originalEventTimeMillis));
- JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
- Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+ JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
+ // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
+ // allow us to keep track of additional properties for reminder events. By
reusing the same job key, we either
+ // encounter an exception that the job already exists and cannot add it to
the scheduler or have to overwrite the
+ // original job properties with the reminder event schedule. Thus, we
differentiate the job and trigger key from the
Review Comment:
could we leave out "or have to overwrite..."? that wasn't ever really a
viable solution was it?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties
jobProps, MultiActiveLeaseArbit
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis));
- jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
- // in addition to the event we want to initiate
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
- String.valueOf(status.getEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(originalEventTimeMillis));
- JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
- Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+ JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
+ // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
Review Comment:
please explain this N:1 relationship more clearly. e.g. has there thus far
been 1 trigger per scheduled job, but now we seek to also create reminder
triggers? in that case multi-active gave rise to the N:1 relationship. or has
it always been that way, even before multi-active?
also, I did notice this -
https://github.com/apache/gobblin/blob/e07450f88bc2c3e2262377a5667df5e703cbf7fe/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java#L586
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]