phet commented on code in PR #3743:
URL: https://github.com/apache/gobblin/pull/3743#discussion_r1298904021


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties 
jobProps, MultiActiveLeaseArbit
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
         + random.nextInt(schedulerMaxBackoffMillis));
-    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
-    // in addition to the event we want to initiate
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(status.getEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(originalEventTimeMillis));
-    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
-    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
-        Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+    JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
+    // Triggers:job have an N:1 relationship but the job properties must 
remain constant between both, which does not
+    // allow us to keep track of additional properties for reminder events. By 
reusing the same job key, we either
+    // encounter an exception that the job already exists and cannot add it to 
the scheduler or have to overwrite the
+    // original job properties with the reminder event schedule. Thus, we 
differentiate the job and trigger key from the
+    // original event.
+    JobKey newJobKey = new JobKey(origJobKey.getName() + 
createSuffixForJobTrigger(status.getEventTimeMillis()),
+        origJobKey.getGroup());

Review Comment:
   the best code is skimmable, and this is not, because there's too much to 
visually parse.  raise the level of abstraction.  each method call should sing 
out a single immediately recognizable concept.
   
   e.g. if we encapsulate the reaching inside of this object and that, which is 
so prevalent here, it would read:
   ```
   JobKey triggerJobKey = JobKey.fromJobProps(jobProperties);
   try {
       if (!this.schedulerService.getScheduler().checkExists(triggerJobKey)) {
           ...
       }
       JobKey newJobKey = calcReminderJobKey(status, triggerJobKey);
       // NOTES: change return type to avoid cast; callee accesses 
this.schedulerMaxBackoffMillis
       JobDetailImpl jobDetail = updatePropsInJobDetail(triggerJobKey, status, 
originalEventTimeMillis);
       log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {} in {} "
           + "millis", flowAction, originalEventTimeMillis, 
status.getEventTimeMillis(), trigger.getNextFireTime());
       // NOTES: new method that also encapsulates creating the `Trigger`; so 
we can log `.getNextFireTime(), it returns that
       Trigger reminderTrigger = 
this.schedulerService.scheduleJob(reminderJobKey, jobDetail);
       log.info(...)
   }
   ```
   up to you if you have the time RN... but this is what to aim for :)
   
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties 
jobProps, MultiActiveLeaseArbit
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
         + random.nextInt(schedulerMaxBackoffMillis));
-    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
-    // in addition to the event we want to initiate
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(status.getEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(originalEventTimeMillis));
-    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
-    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
-        Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+    JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
+    // Triggers:job have an N:1 relationship but the job properties must 
remain constant between both, which does not
+    // allow us to keep track of additional properties for reminder events. By 
reusing the same job key, we either
+    // encounter an exception that the job already exists and cannot add it to 
the scheduler or have to overwrite the
+    // original job properties with the reminder event schedule. Thus, we 
differentiate the job and trigger key from the
+    // original event.
+    JobKey newJobKey = new JobKey(origJobKey.getName() + 
createSuffixForJobTrigger(status.getEventTimeMillis()),
+        origJobKey.getGroup());
     try {
-      log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {} in {} millis",
+      if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
+        log.warn("Attempting to set a reminder for a job that does not exist 
in the scheduler. Key: {}", origJobKey);

Review Comment:
   it doesn't seem like we're actually "Attempting", but rather skipping.  if 
so, this logging is misleading.
   
   also, let's not create `newJobKey` before confirming a need for it.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -497,6 +497,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction 
action) {
     FlowId flowId = action.getFlowId();
     FlowSpec spec;
     try {
+      log.info("Handle launch flow event for action: {}", action);

Review Comment:
   isn't similar logging also proposed in 
https://github.com/apache/gobblin/pull/3746 ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties 
jobProps, MultiActiveLeaseArbit
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
         + random.nextInt(schedulerMaxBackoffMillis));
-    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
-    // in addition to the event we want to initiate
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(status.getEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(originalEventTimeMillis));
-    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
-    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
-        Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+    JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
+    // Triggers:job have an N:1 relationship but the job properties must 
remain constant between both, which does not
+    // allow us to keep track of additional properties for reminder events. By 
reusing the same job key, we either
+    // encounter an exception that the job already exists and cannot add it to 
the scheduler or have to overwrite the
+    // original job properties with the reminder event schedule. Thus, we 
differentiate the job and trigger key from the

Review Comment:
   could we leave out "or have to overwrite..."? that wasn't ever really a 
viable solution was it?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -170,28 +174,80 @@ private void scheduleReminderForEvent(Properties 
jobProps, MultiActiveLeaseArbit
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
         + random.nextInt(schedulerMaxBackoffMillis));
-    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
-    // in addition to the event we want to initiate
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(status.getEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
-        String.valueOf(originalEventTimeMillis));
-    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
-    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
-        Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
+    JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
+    // Triggers:job have an N:1 relationship but the job properties must 
remain constant between both, which does not

Review Comment:
   please explain this N:1 relationship more clearly.  e.g. has there thus far 
been 1 trigger per scheduled job, but now we seek to also create reminder 
triggers?  in that case multi-active gave rise to the N:1 relationship.  or has 
it always been that way, even before multi-active?
   
   also, I did notice this - 
https://github.com/apache/gobblin/blob/e07450f88bc2c3e2262377a5667df5e703cbf7fe/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java#L586



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to