umustafi commented on code in PR #3994:
URL: https://github.com/apache/gobblin/pull/3994#discussion_r1669433410


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory)
    * @throws SchedulerException
    */
   public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long 
reminderDurationMillis,
-      boolean isDeadlineReminder)
-      throws SchedulerException {
+      boolean isDeadlineReminder) throws SchedulerException {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
     JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
-    Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(), 
reminderDurationMillis,
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
-    log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
-        leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
-    quartzScheduler.scheduleJob(jobDetail, trigger);
+    log.info("Going to set reminder for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",

Review Comment:
   let's prefix all of these logs with `DagActionReminderScheduler` so easy to 
find



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory)
    * @throws SchedulerException
    */
   public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long 
reminderDurationMillis,
-      boolean isDeadlineReminder)
-      throws SchedulerException {
+      boolean isDeadlineReminder) throws SchedulerException {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
     JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
-    Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(), 
reminderDurationMillis,
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
-    log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
-        leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
-    quartzScheduler.scheduleJob(jobDetail, trigger);
+    log.info("Going to set reminder for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
+        dagAction, reminderDurationMillis, isDeadlineReminder);
+
+    try {
+      if (!this.deleteDeadlineDagActionCache.get(dagAction)) {
+        quartzScheduler.scheduleJob(jobDetail, trigger);
+      } else {
+        log.info("Ignoring {} because the delete equivalent of the same 
received already.", dagAction);
+        this.deleteDeadlineDagActionCache.put(dagAction, false);
+      }
+    } catch (ObjectAlreadyExistsException e) {
+      log.warn("Reminder job {} already exists in the quartz scheduler. 
Possibly a duplicate request.", jobDetail.getKey());

Review Comment:
   `Reminder job for this dagAction already exists in the 
DagActionReminderScheduler. Look for duplicate requests for the same action. 
Job: {}`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -131,6 +129,7 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
       dagManagementStateStore.addDagNodeState(dagNode, dagId);
+      sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);

Review Comment:
   comment to explain why moved below to document



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory)
    * @throws SchedulerException
    */
   public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long 
reminderDurationMillis,
-      boolean isDeadlineReminder)
-      throws SchedulerException {
+      boolean isDeadlineReminder) throws SchedulerException {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
     JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
-    Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(), 
reminderDurationMillis,
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
-    log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
-        leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
-    quartzScheduler.scheduleJob(jobDetail, trigger);
+    log.info("Going to set reminder for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
+        dagAction, reminderDurationMillis, isDeadlineReminder);
+
+    try {
+      if (!this.deleteDeadlineDagActionCache.get(dagAction)) {
+        quartzScheduler.scheduleJob(jobDetail, trigger);
+      } else {
+        log.info("Ignoring {} because the delete equivalent of the same 
received already.", dagAction);
+        this.deleteDeadlineDagActionCache.put(dagAction, false);
+      }
+    } catch (ObjectAlreadyExistsException e) {
+      log.warn("Reminder job {} already exists in the quartz scheduler. 
Possibly a duplicate request.", jobDetail.getKey());
+    } catch (JobPersistenceException e) {
+      // this may happen when there is a race condition between this and 
delete job operation, retry this in that case
+      quartzScheduler.scheduleJob(jobDetail, trigger);

Review Comment:
   what is this exception? does the occur if the scheduler attempts to delete 
it at the same time? clarify comment



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java:
##########
@@ -122,6 +123,9 @@ public void addJobDagAction(String flowGroup, String 
flowName, long flowExecutio
     try {
       fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType, insertStatement);
       return insertStatement.executeUpdate();
+    } catch (SQLIntegrityConstraintViolationException e) {
+      log.error(e.getMessage());

Review Comment:
   why do we skip throwing an exception here?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -107,23 +107,10 @@ public DagManagementTaskStreamImpl(Config config, 
Optional<DagActionStore> dagAc
     this.dagManagementStateStore = dagManagementStateStore;
   }
 
-  @Override
-  public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add original (non-reminder) dagAction {}", dagAction);
-
-    if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction, 
false, System.currentTimeMillis()))) {
-      throw new RuntimeException(String.format("Could not add dag action to 
the queue %s", dagAction));
-    }
-  }
-
-  @Override
-  public synchronized void addReminderDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add reminder dagAction {}", reminderLeaseParams);
-
+  public synchronized void addDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {

Review Comment:
   simply name this lease params since used for both type of actions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to