This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a5d4b63 add some logs and cache DagManagement in 
DagActionReminderScheduler (#3997)
a1a5d4b63 is described below

commit a1a5d4b63348fd79fa379a6a1fbfc95d8c495911
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Jul 10 12:15:18 2024 -0700

    add some logs and cache DagManagement in DagActionReminderScheduler (#3997)
    
    * do not retry SQLIntegrityConstraintViolationException in 
KafkaJobStatusMonitor
---
 .../orchestration/DagActionReminderScheduler.java  | 26 ++++++++++----------
 .../modules/orchestration/DagActionStore.java      |  4 ++++
 .../modules/orchestration/DagManagement.java       | 10 ++------
 .../orchestration/DagManagementTaskStreamImpl.java | 21 ++++------------
 .../MostlyMySqlDagManagementStateStore.java        |  5 ++++
 .../MysqlMultiActiveLeaseArbiter.java              |  6 ++---
 .../modules/orchestration/proc/DagProcUtils.java   |  3 +--
 .../DagManagementDagActionStoreChangeMonitor.java  |  2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  | 28 +++++++++++++++++++---
 .../DagActionReminderSchedulerTest.java            | 10 ++++++--
 .../DagManagementTaskStreamImplTest.java           |  8 ++++---
 11 files changed, 71 insertions(+), 52 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index ac856f32d..7109f343e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -39,7 +39,6 @@ import javax.inject.Singleton;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 
 
 /**
@@ -57,16 +56,17 @@ import 
org.apache.gobblin.service.modules.core.GobblinServiceManager;
 @Slf4j
 @Singleton
 public class DagActionReminderScheduler {
-  public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY = 
"DagActionReminderScheduler";
   public static final String RetryReminderKeyGroup = "RetryReminder";
   public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
   private final Scheduler quartzScheduler;
+  private final DagManagement dagManagement;
 
   @Inject
-  public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
+  public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, 
DagManagement dagManagement)
       throws SchedulerException {
     // Creates a new Scheduler to be used solely for the DagProc reminders
     this.quartzScheduler = schedulerFactory.getScheduler();
+    this.dagManagement = dagManagement;
   }
 
   /**
@@ -77,19 +77,21 @@ public class DagActionReminderScheduler {
    * @throws SchedulerException
    */
   public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long 
reminderDurationMillis,
-      boolean isDeadlineReminder)
-      throws SchedulerException {
+      boolean isDeadlineReminder) throws SchedulerException {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
     JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
     Trigger trigger = createReminderJobTrigger(leaseParams, 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
-    log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
-        leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
+    log.info("Going to set reminder for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
+        dagAction, reminderDurationMillis, isDeadlineReminder);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, 
boolean isDeadlineTrigger) throws SchedulerException {
-    log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}", 
leaseParams, isDeadlineTrigger);
-    quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger));
+    log.info("Reminder unset for LeaseParams {}, isDeadlineTrigger: {}", 
leaseParams, isDeadlineTrigger);
+    if (!quartzScheduler.deleteJob(createJobKey(leaseParams, 
isDeadlineTrigger))) {
+      log.warn("Reminder not found for {}. Possibly the event is received 
out-of-order.", leaseParams);
+    }
   }
 
   /**
@@ -97,8 +99,7 @@ public class DagActionReminderScheduler {
    * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
    * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
    */
-  @Slf4j
-  public static class ReminderJob implements Job {
+  public class ReminderJob implements Job {
     public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
     public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
 
@@ -119,8 +120,7 @@ public class DagActionReminderScheduler {
       log.info("DagProc reminder triggered for dagAction event: {}", 
reminderLeaseParams);
 
       try {
-        DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
-        dagManagement.addReminderDagAction(reminderLeaseParams);
+        dagManagement.addDagAction(reminderLeaseParams);
       } catch (IOException e) {
         log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderLeaseParams);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 4bc7e639c..c538242a2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -114,6 +114,10 @@ public interface DagActionStore {
       this(dagAction, false, eventTimeMillis);
     }
 
+    public LeaseParams(DagAction dagAction) {
+      this(dagAction, System.currentTimeMillis());
+    }
+
     /**
      * Replace flow execution id in dagAction with agreed upon event time to 
easily track the flow
      */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index caa5e96ac..6184c5dee 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -26,14 +26,8 @@ import java.io.IOException;
  * Consumption of the Dags happen through {@link DagTaskStream}.
  */
 public interface DagManagement {
-
-  /**
-   * Used to add a dagAction event to DagManagement
-   */
-  void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
-
   /**
-   * Used to add reminder dagActions to the queue that already contain an 
eventTimestamp from the previous lease attempt
+   * Used to add {@link DagActionStore.LeaseParams} to the queue
    */
-  void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams) 
throws IOException;
+  void addDagAction(DagActionStore.LeaseParams leaseParams) throws IOException;
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index dc8967990..064e721dd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -107,23 +107,10 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     this.dagManagementStateStore = dagManagementStateStore;
   }
 
-  @Override
-  public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add original (non-reminder) dagAction {}", dagAction);
-
-    if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction, 
false, System.currentTimeMillis()))) {
-      throw new RuntimeException(String.format("Could not add dag action to 
the queue %s", dagAction));
-    }
-  }
-
-  @Override
-  public synchronized void addReminderDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add reminder dagAction {}", reminderLeaseParams);
-
-    if (!this.leaseParamsQueue.offer(reminderLeaseParams)) {
-      throw new RuntimeException(String.format("Could not add reminder dag 
action to the queue %s", reminderLeaseParams));
+  public synchronized void addDagAction(DagActionStore.LeaseParams 
leaseParams) {
+    log.info("Adding {} to queue...", leaseParams);
+    if (!this.leaseParamsQueue.offer(leaseParams)) {
+      throw new RuntimeException(String.format("Could not add %s to the 
queue", leaseParams));
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ce217ad5a..d0c68a3fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -140,6 +140,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
     this.dagStateStore.cleanUp(dag);
     // todo - updated failedDagStateStore iff cleanup returned 1
     this.failedDagStateStore.writeCheckpoint(dag);
+    log.info("Marked dag failed {}", DagManagerUtils.generateDagId(dag));
   }
 
   @Override
@@ -155,6 +156,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   @Override
   public void deleteDag(DagManager.DagId dagId) throws IOException {
     this.dagStateStore.cleanUp(dagId.toString());
+    log.info("Deleted dag {}", dagId);
   }
 
   @Override
@@ -289,11 +291,14 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
   @Override
   public void addJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException {
+    log.info("Adding Dag Action for flowGroup {}, flowName {}, flowExecutionId 
{}, jobName {}, dagActionType {}",
+        flowGroup, flowName, flowExecutionId, jobName, dagActionType);
     this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
   }
 
   @Override
   public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws 
IOException {
+    log.info("Deleting Dag Action {}", dagAction);
     return this.dagActionStore.deleteDagAction(dagAction);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7aa3c42b9..1a6032e44 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -279,7 +279,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
       if (leaseParams.isReminder()) {
         if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
-          log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new 
event trigger "
+          log.info("tryAcquireLease for {} - dbEventTimeMillis: {} - A new 
event trigger "
                   + "is being worked on, so this older reminder will be 
dropped.", leaseParams,
               dbEventTimestamp);
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
@@ -630,11 +630,11 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
                 status.getEventTimeMillis());
             return false;
           }
-          if( numRowsUpdated == 1) {
+          if (numRowsUpdated == 1) {
             log.info("Multi-active lease arbiter lease attempt: [{}, 
eventTimestamp: {}] - COMPLETED, no longer leasing"
                     + " this event after this.", dagAction, 
status.getEventTimeMillis());
             return true;
-          };
+          }
           throw new IOException(String.format("Attempt to complete lease use: 
[%s, eventTimestamp: %s] - updated more "
                   + "rows than expected", dagAction, 
status.getEventTimeMillis()));
         }, true);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b11bb4c1c..808b87858 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -117,8 +117,6 @@ public class DagProcUtils {
       // blocks (by calling Future#get()) until the submission is completed.
       dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
 
-      sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
-
       Future<?> addSpecFuture = producer.addSpec(jobSpec);
       // todo - we should add future.get() instead of the complete future into 
the JobExecutionPlan
       
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
@@ -131,6 +129,7 @@ public class DagProcUtils {
       log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
       dagManagementStateStore.addDagNodeState(dagNode, dagId);
+      sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
     } catch (Exception e) {
       TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
       String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 0f2ead5d4..35e4dafad 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -112,7 +112,7 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
         case LAUNCH :
         case REEVALUATE :
         case RESUME:
-          dagManagement.addDagAction(dagAction);
+          dagManagement.addDagAction(new 
DagActionStore.LeaseParams(dagAction));
           break;
         default:
           log.warn("Received unsupported dagAction {}. Expected to be a 
RESUME, KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9b7e40695..d2800fbe3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.sql.SQLIntegrityConstraintViolationException;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -104,7 +106,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
   private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
-      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume 
non-transient and give up
+      // keeping the retry timeout less until we configure retryer to retry 
only the transient exceptions
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(12L), // after 12 hours, 
presume non-transient and give up
       RETRY_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1L), // back-off to 
once/minute
       RETRY_TYPE, RetryType.EXPONENTIAL.name()));
   private static final Config DEFAULTS = 
ConfigFactory.parseMap(ImmutableMap.of(
@@ -120,6 +123,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final GaaSJobObservabilityEventProducer eventProducer;
   private final DagManagementStateStore dagManagementStateStore;
   private final boolean dagProcEngineEnabled;
+  private final List<Class<? extends Exception>> nonRetryableExceptions = 
Collections.singletonList(SQLIntegrityConstraintViolationException.class);
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
       GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore)
@@ -139,6 +143,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         : ConfigFactory.empty();
     // log exceptions to expose errors we suffer under and/or guide 
intervention when resolution not readily forthcoming
+    // todo - this retryer retries all the exceptions. we should make it retry 
only really transient
     this.persistJobStatusRetryer =
         
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
           @Override
@@ -230,7 +235,16 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           if (this.dagProcEngineEnabled && 
DagProcUtils.isJobLevelStatus(jobName)) {
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
-              this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+              try {
+                this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+              } catch (Exception e) {
+                if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
+                  // todo - add metrics
+                  log.warn("Duplicate REEVALUATE Dag Action is being created. 
Ignoring... " + e.getMessage());
+                } else {
+                  throw e;
+                }
+              }
             } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
               
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
             }
@@ -321,7 +335,12 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       }
 
       modifyStateIfRetryRequired(jobStatus);
-      return ImmutablePair.of(jobStatus, newState(jobStatus, states));
+      NewState newState = newState(jobStatus, states);
+      String newStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+      if (newState == NewState.FINISHED) {
+        log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup, 
flowName, flowExecutionId, jobName, newStatus);
+      }
+      return ImmutablePair.of(jobStatus, newState);
     } catch (Exception e) {
       log.warn("Meet exception when adding jobStatus to state store at "
           + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);
@@ -409,4 +428,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
+  public static boolean isExceptionInstanceOf(Exception exception, 
List<Class<? extends Exception>> typesList) {
+    return typesList.stream().anyMatch(e -> e.isInstance(exception));
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 817e2cef3..623dd7ae1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -36,6 +36,10 @@ import com.google.common.base.Joiner;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
 
 public class DagActionReminderSchedulerTest {
   String flowGroup = "fg";
@@ -55,10 +59,12 @@ public class DagActionReminderSchedulerTest {
   DagActionReminderScheduler dagActionReminderScheduler;
 
   @BeforeClass
-  private void setup() throws SchedulerException {
+  private void setup() throws Exception {
     StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
     schedulerFactory.getScheduler();
-    this.dagActionReminderScheduler = new 
DagActionReminderScheduler(schedulerFactory);
+    DagManagement dagManagement = mock(DagManagement.class);
+    doNothing().when(dagManagement).addDagAction(any());
+    this.dagActionReminderScheduler = new 
DagActionReminderScheduler(schedulerFactory, dagManagement);
   }
 
   @Test
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index a5167ca85..f4d06e607 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -93,9 +93,11 @@ public class DagManagementTaskStreamImplTest {
      LeaseObtainedStatus to the taskStream to break its loop and return a 
newly created dagTask
     */
     DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg", 
"fn", 12345L, "jn", DagActionStore.DagActionType.LAUNCH);
-    dagManagementTaskStream.addDagAction(launchAction);
-    dagManagementTaskStream.addDagAction(launchAction);
-    dagManagementTaskStream.addDagAction(launchAction);
+    DagActionStore.LeaseParams
+        dagActionLeaseParams = new DagActionStore.LeaseParams(launchAction, 
false, System.currentTimeMillis());
+    dagManagementTaskStream.addDagAction(dagActionLeaseParams);
+    dagManagementTaskStream.addDagAction(dagActionLeaseParams);
+    dagManagementTaskStream.addDagAction(dagActionLeaseParams);
     when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
         .tryAcquireLease(any(DagActionStore.LeaseParams.class), anyBoolean()))
         .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),

Reply via email to