This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a1a5d4b63 add some logs and cache DagManagement in
DagActionReminderScheduler (#3997)
a1a5d4b63 is described below
commit a1a5d4b63348fd79fa379a6a1fbfc95d8c495911
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Jul 10 12:15:18 2024 -0700
add some logs and cache DagManagement in DagActionReminderScheduler (#3997)
* do not retry SQLIntegrityConstraintViolationException in
KafkaJobStatusMonitor
---
.../orchestration/DagActionReminderScheduler.java | 26 ++++++++++----------
.../modules/orchestration/DagActionStore.java | 4 ++++
.../modules/orchestration/DagManagement.java | 10 ++------
.../orchestration/DagManagementTaskStreamImpl.java | 21 ++++------------
.../MostlyMySqlDagManagementStateStore.java | 5 ++++
.../MysqlMultiActiveLeaseArbiter.java | 6 ++---
.../modules/orchestration/proc/DagProcUtils.java | 3 +--
.../DagManagementDagActionStoreChangeMonitor.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 28 +++++++++++++++++++---
.../DagActionReminderSchedulerTest.java | 10 ++++++--
.../DagManagementTaskStreamImplTest.java | 8 ++++---
11 files changed, 71 insertions(+), 52 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index ac856f32d..7109f343e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -39,7 +39,6 @@ import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
/**
@@ -57,16 +56,17 @@ import
org.apache.gobblin.service.modules.core.GobblinServiceManager;
@Slf4j
@Singleton
public class DagActionReminderScheduler {
- public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY =
"DagActionReminderScheduler";
public static final String RetryReminderKeyGroup = "RetryReminder";
public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
private final Scheduler quartzScheduler;
+ private final DagManagement dagManagement;
@Inject
- public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
+ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory,
DagManagement dagManagement)
throws SchedulerException {
// Creates a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler = schedulerFactory.getScheduler();
+ this.dagManagement = dagManagement;
}
/**
@@ -77,19 +77,21 @@ public class DagActionReminderScheduler {
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long
reminderDurationMillis,
- boolean isDeadlineReminder)
- throws SchedulerException {
+ boolean isDeadlineReminder) throws SchedulerException {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
Trigger trigger = createReminderJobTrigger(leaseParams,
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
- log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
- leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
+ log.info("Going to set reminder for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
+ dagAction, reminderDurationMillis, isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams,
boolean isDeadlineTrigger) throws SchedulerException {
- log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}",
leaseParams, isDeadlineTrigger);
- quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger));
+ log.info("Reminder unset for LeaseParams {}, isDeadlineTrigger: {}",
leaseParams, isDeadlineTrigger);
+ if (!quartzScheduler.deleteJob(createJobKey(leaseParams,
isDeadlineTrigger))) {
+ log.warn("Reminder not found for {}. Possibly the event is received
out-of-order.", leaseParams);
+ }
}
/**
@@ -97,8 +99,7 @@ public class DagActionReminderScheduler {
* by {@link DagManagement} interface to re-attempt a lease on if it has not
been completed by the previous owner.
* These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
*/
- @Slf4j
- public static class ReminderJob implements Job {
+ public class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
@@ -119,8 +120,7 @@ public class DagActionReminderScheduler {
log.info("DagProc reminder triggered for dagAction event: {}",
reminderLeaseParams);
try {
- DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class);
- dagManagement.addReminderDagAction(reminderLeaseParams);
+ dagManagement.addDagAction(reminderLeaseParams);
} catch (IOException e) {
log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderLeaseParams);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 4bc7e639c..c538242a2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -114,6 +114,10 @@ public interface DagActionStore {
this(dagAction, false, eventTimeMillis);
}
+ public LeaseParams(DagAction dagAction) {
+ this(dagAction, System.currentTimeMillis());
+ }
+
/**
* Replace flow execution id in dagAction with agreed upon event time to
easily track the flow
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index caa5e96ac..6184c5dee 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -26,14 +26,8 @@ import java.io.IOException;
* Consumption of the Dags happen through {@link DagTaskStream}.
*/
public interface DagManagement {
-
- /**
- * Used to add a dagAction event to DagManagement
- */
- void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
-
/**
- * Used to add reminder dagActions to the queue that already contain an
eventTimestamp from the previous lease attempt
+ * Used to add {@link DagActionStore.LeaseParams} to the queue
*/
- void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams)
throws IOException;
+ void addDagAction(DagActionStore.LeaseParams leaseParams) throws IOException;
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index dc8967990..064e721dd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -107,23 +107,10 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
this.dagManagementStateStore = dagManagementStateStore;
}
- @Override
- public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add original (non-reminder) dagAction {}", dagAction);
-
- if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction,
false, System.currentTimeMillis()))) {
- throw new RuntimeException(String.format("Could not add dag action to
the queue %s", dagAction));
- }
- }
-
- @Override
- public synchronized void addReminderDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add reminder dagAction {}", reminderLeaseParams);
-
- if (!this.leaseParamsQueue.offer(reminderLeaseParams)) {
- throw new RuntimeException(String.format("Could not add reminder dag
action to the queue %s", reminderLeaseParams));
+ public synchronized void addDagAction(DagActionStore.LeaseParams
leaseParams) {
+ log.info("Adding {} to queue...", leaseParams);
+ if (!this.leaseParamsQueue.offer(leaseParams)) {
+ throw new RuntimeException(String.format("Could not add %s to the
queue", leaseParams));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ce217ad5a..d0c68a3fd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -140,6 +140,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
this.dagStateStore.cleanUp(dag);
// todo - updated failedDagStateStore iff cleanup returned 1
this.failedDagStateStore.writeCheckpoint(dag);
+ log.info("Marked dag failed {}", DagManagerUtils.generateDagId(dag));
}
@Override
@@ -155,6 +156,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
this.dagStateStore.cleanUp(dagId.toString());
+ log.info("Deleted dag {}", dagId);
}
@Override
@@ -289,11 +291,14 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
@Override
public void addJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
+ log.info("Adding Dag Action for flowGroup {}, flowName {}, flowExecutionId
{}, jobName {}, dagActionType {}",
+ flowGroup, flowName, flowExecutionId, jobName, dagActionType);
this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
}
@Override
public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws
IOException {
+ log.info("Deleting Dag Action {}", dagAction);
return this.dagActionStore.deleteDagAction(dagAction);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7aa3c42b9..1a6032e44 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -279,7 +279,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
if (leaseParams.isReminder()) {
if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
- log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new
event trigger "
+ log.info("tryAcquireLease for {} - dbEventTimeMillis: {} - A new
event trigger "
+ "is being worked on, so this older reminder will be
dropped.", leaseParams,
dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
@@ -630,11 +630,11 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
status.getEventTimeMillis());
return false;
}
- if( numRowsUpdated == 1) {
+ if (numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{},
eventTimestamp: {}] - COMPLETED, no longer leasing"
+ " this event after this.", dagAction,
status.getEventTimeMillis());
return true;
- };
+ }
throw new IOException(String.format("Attempt to complete lease use:
[%s, eventTimestamp: %s] - updated more "
+ "rows than expected", dagAction,
status.getEventTimeMillis()));
}, true);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b11bb4c1c..808b87858 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -117,8 +117,6 @@ public class DagProcUtils {
// blocks (by calling Future#get()) until the submission is completed.
dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
- sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
-
Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into
the JobExecutionPlan
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
@@ -131,6 +129,7 @@ public class DagProcUtils {
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
dagManagementStateStore.addDagNodeState(dagNode, dagId);
+ sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 0f2ead5d4..35e4dafad 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -112,7 +112,7 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
case LAUNCH :
case REEVALUATE :
case RESUME:
- dagManagement.addDagAction(dagAction);
+ dagManagement.addDagAction(new
DagActionStore.LeaseParams(dagAction));
break;
default:
log.warn("Received unsupported dagAction {}. Expected to be a
RESUME, KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9b7e40695..d2800fbe3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -104,7 +106,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final StateStore<org.apache.gobblin.configuration.State> stateStore;
private final ScheduledExecutorService scheduledExecutorService;
private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
- RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume
non-transient and give up
+ // keeping the retry timeout less until we configure retryer to retry
only the transient exceptions
+ RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(12L), // after 12 hours,
presume non-transient and give up
RETRY_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1L), // back-off to
once/minute
RETRY_TYPE, RetryType.EXPONENTIAL.name()));
private static final Config DEFAULTS =
ConfigFactory.parseMap(ImmutableMap.of(
@@ -120,6 +123,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final GaaSJobObservabilityEventProducer eventProducer;
private final DagManagementStateStore dagManagementStateStore;
private final boolean dagProcEngineEnabled;
+ private final List<Class<? extends Exception>> nonRetryableExceptions =
Collections.singletonList(SQLIntegrityConstraintViolationException.class);
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore)
@@ -139,6 +143,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
// log exceptions to expose errors we suffer under and/or guide
intervention when resolution not readily forthcoming
+ // todo - this retryer retries all the exceptions. we should make it retry
only really transient
this.persistJobStatusRetryer =
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
@Override
@@ -230,7 +235,16 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
if (this.dagProcEngineEnabled &&
DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
- this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ try {
+ this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ } catch (Exception e) {
+ if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
+ // todo - add metrics
+ log.warn("Duplicate REEVALUATE Dag Action is being created.
Ignoring... " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
} else if (updatedJobStatus.getRight() == NewState.RUNNING) {
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
@@ -321,7 +335,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
modifyStateIfRetryRequired(jobStatus);
- return ImmutablePair.of(jobStatus, newState(jobStatus, states));
+ NewState newState = newState(jobStatus, states);
+ String newStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ if (newState == NewState.FINISHED) {
+ log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup,
flowName, flowExecutionId, jobName, newStatus);
+ }
+ return ImmutablePair.of(jobStatus, newState);
} catch (Exception e) {
log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
@@ -409,4 +428,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
+ public static boolean isExceptionInstanceOf(Exception exception,
List<Class<? extends Exception>> typesList) {
+ return typesList.stream().anyMatch(e -> e.isInstance(exception));
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 817e2cef3..623dd7ae1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -36,6 +36,10 @@ import com.google.common.base.Joiner;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
public class DagActionReminderSchedulerTest {
String flowGroup = "fg";
@@ -55,10 +59,12 @@ public class DagActionReminderSchedulerTest {
DagActionReminderScheduler dagActionReminderScheduler;
@BeforeClass
- private void setup() throws SchedulerException {
+ private void setup() throws Exception {
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
schedulerFactory.getScheduler();
- this.dagActionReminderScheduler = new
DagActionReminderScheduler(schedulerFactory);
+ DagManagement dagManagement = mock(DagManagement.class);
+ doNothing().when(dagManagement).addDagAction(any());
+ this.dagActionReminderScheduler = new
DagActionReminderScheduler(schedulerFactory, dagManagement);
}
@Test
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index a5167ca85..f4d06e607 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -93,9 +93,11 @@ public class DagManagementTaskStreamImplTest {
LeaseObtainedStatus to the taskStream to break its loop and return a
newly created dagTask
*/
DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg",
"fn", 12345L, "jn", DagActionStore.DagActionType.LAUNCH);
- dagManagementTaskStream.addDagAction(launchAction);
- dagManagementTaskStream.addDagAction(launchAction);
- dagManagementTaskStream.addDagAction(launchAction);
+ DagActionStore.LeaseParams
+ dagActionLeaseParams = new DagActionStore.LeaseParams(launchAction,
false, System.currentTimeMillis());
+ dagManagementTaskStream.addDagAction(dagActionLeaseParams);
+ dagManagementTaskStream.addDagAction(dagActionLeaseParams);
+ dagManagementTaskStream.addDagAction(dagActionLeaseParams);
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
.tryAcquireLease(any(DagActionStore.LeaseParams.class), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),