This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ac653fb75 [GOBBLIN-2074] add dag action store inside
DagManagementStateStore (#3954)
ac653fb75 is described below
commit ac653fb75a3c78663b09fd1e947365f4324b46c8
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 29 14:45:30 2024 -0700
[GOBBLIN-2074] add dag action store inside DagManagementStateStore (#3954)
* remove unused fields/APIs
* add dag action store inside DMSS
* address review comment
---
.../runtime/DagActionStoreChangeMonitorTest.java | 13 ++--
.../runtime/KafkaAvroJobStatusMonitorTest.java | 11 ++-
.../orchestration/DagManagementStateStore.java | 80 ++++++++++++++++++++--
.../orchestration/DagManagementTaskStreamImpl.java | 16 ++---
.../modules/orchestration/FlowLaunchHandler.java | 12 ++--
.../MostlyMySqlDagManagementStateStore.java | 45 ++++++++----
.../modules/orchestration/proc/DagProcUtils.java | 13 ++--
.../orchestration/proc/ReevaluateDagProc.java | 6 +-
.../modules/orchestration/task/DagTask.java | 8 +--
.../task/EnforceFlowFinishDeadlineDagTask.java | 5 +-
.../task/EnforceJobStartDeadlineDagTask.java | 5 +-
.../modules/orchestration/task/KillDagTask.java | 5 +-
.../modules/orchestration/task/LaunchDagTask.java | 5 +-
.../orchestration/task/ReevaluateDagTask.java | 5 +-
.../modules/orchestration/task/ResumeDagTask.java | 5 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 12 ++--
.../monitoring/DagActionStoreChangeMonitor.java | 13 ++--
.../DagActionStoreChangeMonitorFactory.java | 10 +--
.../DagManagementDagActionStoreChangeMonitor.java | 5 +-
...nagementDagActionStoreChangeMonitorFactory.java | 10 +--
.../monitoring/KafkaAvroJobStatusMonitor.java | 6 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 15 ++--
.../monitoring/KafkaJobStatusMonitorFactory.java | 14 ++--
.../DagManagementTaskStreamImplTest.java | 3 +-
.../orchestration/DagProcessingEngineTest.java | 21 +++---
.../MostlyMySqlDagManagementStateStoreTest.java | 3 +-
.../modules/orchestration/OrchestratorTest.java | 2 +-
.../proc/EnforceDeadlineDagProcsTest.java | 7 +-
.../orchestration/proc/KillDagProcTest.java | 8 +--
.../orchestration/proc/LaunchDagProcTest.java | 2 +-
.../orchestration/proc/ReevaluateDagProcTest.java | 13 ++--
.../orchestration/proc/ResumeDagProcTest.java | 3 +-
32 files changed, 231 insertions(+), 150 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 54bc6ace5..9031f0031 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -41,8 +41,8 @@ import
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
@@ -85,13 +85,13 @@ public class DagActionStoreChangeMonitorTest {
public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
boolean isMultiActiveSchedulerEnabled) {
- this(topic, config, numThreads, isMultiActiveSchedulerEnabled,
mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class),
mock(Orchestrator.class));
+ this(topic, config, numThreads, isMultiActiveSchedulerEnabled,
mock(DagManagementStateStore.class), mock(DagManager.class),
mock(FlowCatalog.class), mock(Orchestrator.class));
}
public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled,
- DagActionStore dagActionStore, DagManager dagManager, FlowCatalog
flowCatalog, Orchestrator orchestrator) {
+ DagManagementStateStore dagManagementStateStore, DagManager
dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
- dagActionStore, isMultiActiveSchedulerEnabled);
+ dagManagementStateStore, isMultiActiveSchedulerEnabled);
}
protected void processMessageForTest(DecodeableKafkaRecord record) {
@@ -238,8 +238,7 @@ public class DagActionStoreChangeMonitorTest {
String jobName = "testJobName";
String flowExecutionId = "12345677";
- MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
- mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
Config monitorConfig =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
@@ -251,7 +250,7 @@ public class DagActionStoreChangeMonitorTest {
// Throw an uncaught exception during startup sequence
when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new
RuntimeException("Uncaught exception"));
mockDagActionStoreChangeMonitor = new
MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
- true, mysqlDagActionStore, mockDagManager, mockFlowCatalog,
mockOrchestrator);
+ true, dagManagementStateStore, mockDagManager, mockFlowCatalog,
mockOrchestrator);
try {
mockDagActionStoreChangeMonitor.setActive();
} catch (Exception e) {
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 1d1520d5d..337e6e33e 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -51,7 +51,6 @@ import com.typesafe.config.ConfigValueFactory;
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
-
import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -75,8 +74,7 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -103,7 +101,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
- private MysqlDagActionStore mysqlDagActionStore;
+ private DagManagementStateStore dagManagementStateStore;
private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
@BeforeClass
@@ -123,9 +121,8 @@ public class KafkaAvroJobStatusMonitorTest {
builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
- this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+ this.dagManagementStateStore = mock(DagManagementStateStore.class);
this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
}
@Test
@@ -787,7 +784,7 @@ public class KafkaAvroJobStatusMonitorTest {
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSJobObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, mysqlDagActionStore);
+ super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, dagManagementStateStore);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index b56459da7..6a6140d80 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -146,12 +147,6 @@ public interface DagManagementStateStore {
*/
List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId)
throws IOException;
- /**
- * Returns the {@link Dag} the provided {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
- * or Optional.absent if it is not found.
- */
- Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan>
dagNode);
-
/**
* Deletes the dag node state that was added through {@link
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
* No-op if the dag node is not found in the store.
@@ -201,4 +196,77 @@ public interface DagManagementStateStore {
* has any running job, false otherwise.
*/
public boolean hasRunningJobs(DagManager.DagId dagId);
+
+ /**
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param jobName job name for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ boolean existsJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
+
+ /**
+ * Check if an action exists in dagAction store by flow group, flow name,
and flow execution id, it assumes jobName is
+ * empty ("").
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ boolean existsFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
+
+ /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore}
for durability */
+ default void addDagAction(DagActionStore.DagAction dagAction) throws
IOException {
+ addJobDagAction(
+ dagAction.getFlowGroup(),
+ dagAction.getFlowName(),
+ dagAction.getFlowExecutionId(),
+ dagAction.getJobName(),
+ dagAction.getDagActionType());
+ }
+
+ /**
+ * Persist the dag action in {@link DagActionStore} for durability
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param jobName job name for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException;
+
+ /**
+ * Persist the dag action in {@link DagActionStore} for durability. This
method assumes an empty jobName.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ default void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ DagActionStore.DagActionType dagActionType) throws IOException {
+ addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName,
flowExecutionId, dagActionType));
+ }
+
+ /**
+ * delete the dag action from {@link DagActionStore}
+ * @param dagAction containing all information needed to identify dag and
specific action value
+ * @throws IOException
+ * @return true if we successfully delete one record, return false if the
record does not exist
+ */
+ boolean deleteDagAction(DagActionStore.DagAction dagAction) throws
IOException;
+
+ /***
+ * Get all {@link DagActionStore.DagAction}s from the {@link DagActionStore}.
+ * @throws IOException Exception in retrieving {@link
DagActionStore.DagAction}s.
+ */
+ Collection<DagActionStore.DagAction> getDagActions() throws IOException;
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 1d32cde39..0d4a229d0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -76,9 +76,6 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
private final Config config;
@Getter private final EventSubmitter eventSubmitter;
-
- @Inject(optional=true)
- protected Optional<DagActionStore> dagActionStore;
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
@@ -102,7 +99,6 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
throw new RuntimeException(String.format("DagProcessingEngine requires
%s to be instantiated.",
DagActionReminderScheduler.class.getSimpleName()));
}
- this.dagActionStore = dagActionStore;
this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
this.dagActionReminderScheduler = dagActionReminderScheduler;
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
@@ -214,17 +210,17 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
switch (dagActionType) {
case ENFORCE_FLOW_FINISH_DEADLINE:
- return new EnforceFlowFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagActionStore.get());
+ return new EnforceFlowFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore);
case ENFORCE_JOB_START_DEADLINE:
- return new EnforceJobStartDeadlineDagTask(dagAction,
leaseObtainedStatus, dagActionStore.get());
+ return new EnforceJobStartDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore);
case KILL:
- return new KillDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
+ return new KillDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
case LAUNCH:
- return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
+ return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
case REEVALUATE:
- return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
+ return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
case RESUME:
- return new ResumeDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
+ return new ResumeDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
default:
throw new UnsupportedOperationException(dagActionType + " not yet
implemented");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index bb4eda416..864fae944 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -69,7 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
public class FlowLaunchHandler {
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
- private DagActionStore dagActionStore;
+ private DagManagementStateStore dagManagementStateStore;
private final MetricContext metricContext;
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
@@ -81,13 +81,13 @@ public class FlowLaunchHandler {
@Inject
public FlowLaunchHandler(Config config,
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter leaseArbiter,
- SchedulerService schedulerService,
com.google.common.base.Optional<DagActionStore> optDagActionStore) {
+ SchedulerService schedulerService,
com.google.common.base.Optional<DagManagementStateStore>
dagManagementStateStoreOpt) {
this.multiActiveLeaseArbiter = leaseArbiter;
- if (!optDagActionStore.isPresent()) {
+ if (!dagManagementStateStoreOpt.isPresent()) {
throw new RuntimeException("DagActionStore MUST be present for flow
launch handling!");
}
- this.dagActionStore = optDagActionStore.get();
+ this.dagManagementStateStore = dagManagementStateStoreOpt.get();
this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
@@ -140,8 +140,8 @@ public class FlowLaunchHandler {
private boolean
persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
DagActionStore.DagAction launchDagAction =
leaseStatus.getConsensusDagAction();
try {
- this.dagActionStore.addDagAction(launchDagAction);
- DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
+ this.dagManagementStateStore.addDagAction(launchDagAction);
+
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
launchDagAction);
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 7f9498764..da50c5055 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
@@ -63,11 +64,9 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Slf4j
@Singleton
public class MostlyMySqlDagManagementStateStore implements
DagManagementStateStore {
- private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new ConcurrentHashMap<>();
private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new
ConcurrentHashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
private final Map<DagManager.DagId,
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
ConcurrentHashMap<>();
- private final Map<DagManager.DagId, Long> dagToDeadline = new
ConcurrentHashMap<>();
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
private JobStatusRetriever jobStatusRetriever;
@@ -80,19 +79,21 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
FlowCatalog flowCatalog;
@Getter
private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();
+ private final DagActionStore dagActionStore;
@Inject
public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog, UserQuotaManager userQuotaManager,
- JobStatusRetriever jobStatusRetriever) {
+ JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
this.jobStatusRetriever = jobStatusRetriever;
this.dagManagerMetrics.activate();
+ this.dagActionStore = dagActionStore;
}
// It should be called after topology spec map is set
- private synchronized void start() throws IOException {
+ private synchronized void start() {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
@@ -180,9 +181,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
@Override
// todo - updating different maps here and in addDagNodeState can result in
inconsistency between the maps
public synchronized void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
- this.jobToDag.remove(dagNode);
this.dagNodes.remove(dagNode.getValue().getId());
- this.dagToDeadline.remove(dagId);
if (this.dagToJobs.containsKey(dagId)) {
this.dagToJobs.get(dagId).remove(dagNode);
if (this.dagToJobs.get(dagId).isEmpty()) {
@@ -199,7 +198,6 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
if (!dag.isPresent()) {
throw new RuntimeException("Dag " + dagId + " not found");
}
- this.jobToDag.put(dagNode, dag.get());
this.dagNodes.put(dagNode.getValue().getId(), dagNode);
if (!this.dagToJobs.containsKey(dagId)) {
this.dagToJobs.put(dagId, Lists.newLinkedList());
@@ -227,11 +225,6 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
}
}
- @Override
- public Optional<Dag<JobExecutionPlan>>
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
- return Optional.of(this.jobToDag.get(dagNode));
- }
-
@Override
public List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
dagId) {
List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
@@ -275,4 +268,32 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
public boolean hasRunningJobs(DagManager.DagId dagId) {
return !getDagNodes(dagId).isEmpty();
}
+
+ @Override
+ public boolean existsJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException {
+ return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
+ }
+
+ @Override
+ public boolean existsFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException {
+ return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
dagActionType);
+ }
+
+ @Override
+ public void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException {
+ this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
+ }
+
+ @Override
+ public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws
IOException {
+ return this.dagActionStore.deleteDagAction(dagAction);
+ }
+
+ @Override
+ public Collection<DagActionStore.DagAction> getDagActions() throws
IOException {
+ return this.dagActionStore.getDagActions();
+ }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 5396b0502..41615c79b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -37,7 +37,6 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -55,8 +54,6 @@ import static
org.apache.gobblin.service.ExecutionStatus.CANCELLED;
*/
@Slf4j
public class DagProcUtils {
- private static final DagActionStore dagActionStore =
GobblinServiceManager.getClass(DagActionStore.class);
-
/**
* - submits a {@link JobSpec} to a {@link SpecExecutor}
* - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
@@ -94,7 +91,7 @@ public class DagProcUtils {
// blocks (by calling Future#get()) until the submission is completed.
dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
- sendEnforceJobStartDeadlineDagAction(dagNode);
+ sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into
the JobExecutionPlan
@@ -167,16 +164,16 @@ public class DagProcUtils {
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
- private static void
sendEnforceJobStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
+ private static void
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
- dagActionStore.addJobDagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(),
+ dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(),
String.valueOf(dagNode.getValue().getFlowExecutionId()),
dagNode.getValue().getJobName(),
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
}
- public static void
sendEnforceFlowFinishDeadlineDagAction(DagActionStore.DagAction launchDagAction)
+ public static void
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, DagActionStore.DagAction launchDagAction)
throws IOException {
- dagActionStore.addFlowDagAction(launchDagAction.getFlowGroup(),
launchDagAction.getFlowName(),
+ dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(),
launchDagAction.getFlowName(),
launchDagAction.getFlowExecutionId(),
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 10529b25e..bc786b91f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -119,7 +119,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dagManagementStateStore.markDagFailed(dag);
}
- removeFlowFinishDeadlineTriggerAndDagAction();
+ removeFlowFinishDeadlineTriggerAndDagAction(dagManagementStateStore);
}
}
@@ -197,7 +197,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
throw new UnsupportedOperationException("More than one start job is not
allowed");
}
- private void removeFlowFinishDeadlineTriggerAndDagAction() {
+ private void
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore) {
DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
getDagNodeId().getFlowName(),
String.valueOf(getDagNodeId().getFlowExecutionId()),
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
@@ -206,7 +206,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
try {
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
-
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceFlowFinishDeadlineDagAction);
+
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
} catch (SchedulerException | IOException e) {
log.warn("Failed to unschedule the reminder for {}",
enforceFlowFinishDeadlineDagAction);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index f031134d1..b01860a9e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -41,13 +41,13 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
- private final DagActionStore dagActionStore;
+ private final DagManagementStateStore dagManagementStateStore;
public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
+ DagManagementStateStore dagManagementStateStore) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
}
public abstract <T> T host(DagTaskVisitor<T> visitor);
@@ -59,7 +59,7 @@ public abstract class DagTask {
*/
public final boolean conclude() {
try {
- this.dagActionStore.deleteDagAction(this.dagAction);
+ this.dagManagementStateStore.deleteDagAction(this.dagAction);
return this.leaseObtainedStatus.completeLease();
} catch (IOException e) {
// TODO: Decide appropriate exception to throw and add to the commit
method's signature
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
index 5eaf4ffa7..bfd709fb9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -29,8 +30,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class EnforceFlowFinishDeadlineDagTask extends DagTask {
public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
index 7ad3867a5..c1c270eb7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -29,8 +30,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class EnforceJobStartDeadlineDagTask extends DagTask {
public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
index 9533a0e69..3d7315378 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -28,8 +29,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class KillDagTask extends DagTask {
public KillDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 88a620dda..8a726b2dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -28,8 +29,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class LaunchDagTask extends DagTask {
public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index 7a90c9912..ea1b36e34 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -28,8 +29,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class ReevaluateDagTask extends DagTask {
public ReevaluateDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
index b9ff4fb8a..4e5ed2657 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.task;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -27,8 +28,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
*/
public class ResumeDagTask extends DagTask {
public ResumeDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagActionStore dagActionStore) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ DagManagementStateStore dagManagementStateStore) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index caee99e59..1b5e21083 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -40,16 +40,18 @@ import
org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+
@Slf4j
public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends
GobblinServiceFlowExecutionResourceHandler{
- private DagActionStore dagActionStore;
+ private DagManagementStateStore dagManagementStateStore;
@Inject
public
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler
handler,
@Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
- Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER)
boolean forceLeader, DagActionStore dagActionStore) {
+ Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER)
boolean forceLeader, DagManagementStateStore dagManagementStateStore) {
super(handler, eventBus, manager, forceLeader);
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
}
@Override
@@ -69,12 +71,12 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
protected void addDagAction(String flowGroup, String flowName, Long
flowExecutionId, DagActionStore.DagActionType actionType) {
try {
// If an existing resume request is still pending then do not accept
this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), actionType)) {
+ if (this.dagManagementStateStore.existsFlowDagAction(flowGroup,
flowName, flowExecutionId.toString(), actionType)) {
this.throwErrorResponse("There is already a pending " + actionType + "
action for this flow. Please wait to resubmit and wait "
+ "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
- this.dagActionStore.addFlowDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
+ this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add %s action for flow %s %s %s to dag
action store due to:", actionType, flowGroup,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 814236eeb..7f566a56b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -48,6 +48,7 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -98,14 +99,14 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;
- protected DagActionStore dagActionStore;
+ protected DagManagementStateStore dagManagementStateStore;
@Getter
private volatile boolean isActive;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagManager
dagManager, int numThreads,
- FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore
dagActionStore,
+ FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
boolean isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
@@ -114,7 +115,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
/*
@@ -135,9 +136,9 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
* Load all actions from the DagActionStore to process any missed actions
during service startup
*/
protected void initializeMonitor() {
- Collection<DagActionStore.DagAction> dagActions = null;
+ Collection<DagActionStore.DagAction> dagActions;
try {
- dagActions = dagActionStore.getDagActions();
+ dagActions = dagManagementStateStore.getDagActions();
} catch (IOException e) {
throw new RuntimeException(String.format("Unable to retrieve dagActions
from the dagActionStore while "
+ "initializing the %s",
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
@@ -326,7 +327,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
} finally {
// Delete the dag action regardless of whether it was processed
successfully to avoid accumulating failure cases
try {
- this.dagActionStore.deleteDagAction(dagAction);
+ this.dagManagementStateStore.deleteDagAction(dagAction);
} catch (IOException e) {
log.warn("Failed to delete dag action from dagActionStore. dagAction:
{}", dagAction);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 767f2b76d..5bac15089 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -28,7 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.util.ConfigUtils;
@@ -45,18 +45,18 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
private DagManager dagManager;
private FlowCatalog flowCatalog;
private Orchestrator orchestrator;
- private DagActionStore dagActionStore;
+ private DagManagementStateStore dagManagementStateStore;
private boolean isMultiActiveSchedulerEnabled;
@Inject
public DagActionStoreChangeMonitorFactory(Config config, DagManager
dagManager, FlowCatalog flowCatalog,
- Orchestrator orchestrator, DagActionStore dagActionStore,
+ Orchestrator orchestrator, DagManagementStateStore
dagManagementStateStore,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
}
@@ -68,7 +68,7 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagManager, numThreads, flowCatalog,
- orchestrator, dagActionStore, isMultiActiveSchedulerEnabled);
+ orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index eacecab94..ab863974c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -40,11 +41,11 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
- FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore
dagActionStore,
+ FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
// DagManager is only needed in the `handleDagAction` method of its parent
class and not needed in this class,
// so we are passing a null value for DagManager to its parent class.
- super("", config, null, numThreads, flowCatalog, orchestrator,
dagActionStore, isMultiActiveSchedulerEnabled);
+ super("", config, null, numThreads, flowCatalog, orchestrator,
dagManagementStateStore, isMultiActiveSchedulerEnabled);
this.dagManagement = dagManagement;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 53b1f6836..c5828575d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -28,8 +28,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.util.ConfigUtils;
@@ -45,18 +45,18 @@ public class
DagManagementDagActionStoreChangeMonitorFactory implements Provider
private final Config config;
private final FlowCatalog flowCatalog;
private final Orchestrator orchestrator;
- private final DagActionStore dagActionStore;
+ private final DagManagementStateStore dagManagementStateStore;
private final boolean isMultiActiveSchedulerEnabled;
private final DagManagement dagManagement;
@Inject
public DagManagementDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
- Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement
dagManagement,
+ Orchestrator orchestrator, DagManagementStateStore
dagManagementStateStore, DagManagement dagManagement,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagManagement = dagManagement;
}
@@ -68,7 +68,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
return new
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
- numThreads, flowCatalog, orchestrator, dagActionStore,
isMultiActiveSchedulerEnabled, this.dagManagement);
+ numThreads, flowCatalog, orchestrator, dagManagementStateStore,
isMultiActiveSchedulerEnabled, this.dagManagement);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index eb405ba60..74a864da3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -46,7 +46,7 @@ import
org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.util.ConfigUtils;
@@ -66,9 +66,9 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
private Meter messageParseFailures;
public KafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads, JobIssueEventHandler jobIssueEventHandler,
- GaaSJobObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagActionStore);
+ super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagManagementStateStore);
if (ConfigUtils.getBoolean(config,
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new
KafkaAvroSchemaRegistryFactory().
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9d707ffdc..d6ff2b651 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -67,6 +67,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
@@ -119,11 +120,11 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final JobIssueEventHandler jobIssueEventHandler;
private final Retryer<Void> persistJobStatusRetryer;
private final GaaSJobObservabilityEventProducer eventProducer;
- private final DagActionStore dagActionStore;
+ private final DagManagementStateStore dagManagementStateStore;
private final boolean dagProcEngineEnabled;
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
- GaaSJobObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY,
FileContextBasedFsStateStoreFactory.class.getName());
@@ -133,7 +134,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.jobIssueEventHandler = jobIssueEventHandler;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
this.dagProcEngineEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
Config retryerOverridesConfig =
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
@@ -227,10 +228,10 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.eventProducer.emitObservabilityEvent(jobStatus);
if (this.dagProcEngineEnabled) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
- this.dagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
}
} else if (updatedJobStatus.getRight() == NewState.RUNNING) {
- removeStartDeadlineTriggerAndDagAction(flowGroup, flowName,
flowExecutionId, jobName);
+ removeStartDeadlineTriggerAndDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
// update the state store after adding a dag action to guaranty
at-least-once adding of dag action
@@ -256,7 +257,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String
flowName, String flowExecutionId, String jobName) {
+ private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore, String flowGroup, String flowName, String
flowExecutionId, String jobName) {
DagActionStore.DagAction enforceStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
log.info("Deleting reminder trigger and dag action {}",
enforceStartDeadlineDagAction);
@@ -264,7 +265,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
try {
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
-
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+ dagManagementStateStore.deleteDagAction(enforceStartDeadlineDagAction);
} catch (SchedulerException | IOException e) {
log.error("Failed to unschedule the reminder for {}",
enforceStartDeadlineDagAction);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index a1698e57a..756e04f73 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -35,7 +35,7 @@ import
org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -52,22 +52,22 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
private final JobIssueEventHandler jobIssueEventHandler;
private final MultiContextIssueRepository issueRepository;
private final boolean instrumentationEnabled;
- private final DagActionStore dagActionStore;
+ private final DagManagementStateStore dagManagementStateStore;
private final boolean dagProcEngineEnabled;
@Inject
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- GobblinInstanceEnvironment env, DagActionStore dagActionStore,
@Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean dagProcEngineEnabled) {
- this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled(), dagActionStore, dagProcEngineEnabled);
+ GobblinInstanceEnvironment env, DagManagementStateStore
dagManagementStateStore, @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean
dagProcEngineEnabled) {
+ this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled(), dagManagementStateStore, dagProcEngineEnabled);
}
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- boolean instrumentationEnabled, DagActionStore dagActionStore, boolean
dagProcEngineEnabled) {
+ boolean instrumentationEnabled, DagManagementStateStore
dagManagementStateStore, boolean dagProcEngineEnabled) {
this.config = Objects.requireNonNull(config);
this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
this.issueRepository = issueRepository;
this.instrumentationEnabled = instrumentationEnabled;
- this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
this.dagProcEngineEnabled = dagProcEngineEnabled;
}
@@ -103,7 +103,7 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
return (KafkaJobStatusMonitor) GobblinConstructorUtils
- .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
dagActionStore);
+ .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
dagManagementStateStore);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 070d6a85b..8d4ff795a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -74,7 +74,8 @@ public class DagManagementTaskStreamImplTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config,
+ null, null, null, mock(DagActionStore.class));
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index d1f1e736d..92345122b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -58,16 +58,15 @@ public class DagProcessingEngineTest {
private DagManagementTaskStreamImpl dagManagementTaskStream;
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private static MostlyMySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testMetastoreDatabase;
- static DagActionStore dagActionStore;
static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- dagActionStore = mock(DagActionStore.class);
+ DagActionStore dagActionStore = mock(DagActionStore.class);
doReturn(true).when(dagActionStore).deleteDagAction(any());
leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
doReturn(true).when(leaseObtainedStatus).completeLease();
@@ -87,16 +86,18 @@ public class DagProcessingEngineTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
- this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
- this.dagManagementTaskStream =
+ dagManagementStateStore = new MostlyMySqlDagManagementStateStore(config,
null,
+ null, null, dagActionStore);
+ dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ doReturn(true).when(dagActionStore).deleteDagAction(any());
+ dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false,
- this.dagManagementStateStore);
+ dagManagementStateStore);
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
- new
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream,
this.dagProcFactory,
+ new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream,
this.dagProcFactory,
dagManagementStateStore, 0);
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
@@ -139,7 +140,7 @@ public class DagProcessingEngineTest {
private final boolean isBad;
public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, leaseObtainedStatus, dagActionStore);
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore);
this.isBad = isBad;
}
@@ -191,6 +192,6 @@ public class DagProcessingEngineTest {
+ "Actual number of invocations " +
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);
-
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
+
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 6d6f8b431..970309cef 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -99,7 +99,6 @@ public class MostlyMySqlDagManagementStateStoreTest {
Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getDag(dagId).get().toString());
Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
- Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
List<Dag.DagNode<JobExecutionPlan>> dagNodes =
this.dagManagementStateStore.getDagNodes(dagId);
Assert.assertEquals(2, dagNodes.size());
@@ -138,7 +137,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever);
+ new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever, mock(DagActionStore.class));
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 87ce00368..86d6ff104 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -125,7 +125,7 @@ public class OrchestratorTest {
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE).build();
MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null, null);
+ new MostlyMySqlDagManagementStateStore(config, null, null, null,
mock(DagActionStore.class));
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 4f8f7b365..52d80469f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -92,12 +91,11 @@ public class EnforceDeadlineDagProcsTest {
message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
- "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
@@ -129,12 +127,11 @@ public class EnforceDeadlineDagProcsTest {
message("Test
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
is in running state
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
- "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
Assert.assertEquals(numOfDagNodes,
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index c283b49d5..b7c86eebe 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -94,7 +94,7 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow1",
String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
- null, mock(DagActionStore.class)), flowCompilationValidationHelper);
+ null, this.dagManagementStateStore), flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -107,7 +107,7 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow1",
String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.KILL),
- null, mock(DagActionStore.class)));
+ null, this.dagManagementStateStore));
killDagProc.process(this.dagManagementStateStore);
long cancelJobCount = specProducers.stream()
@@ -137,7 +137,7 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow2",
String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
- null, mock(DagActionStore.class)), flowCompilationValidationHelper);
+ null, this.dagManagementStateStore), flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -150,7 +150,7 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow2",
String.valueOf(flowExecutionId), "job2",
DagActionStore.DagActionType.KILL),
- null, mock(DagActionStore.class)));
+ null, this.dagManagementStateStore));
killDagProc.process(this.dagManagementStateStore);
long cancelJobCount = specProducers.stream()
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 48c65870e..34a323914 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -89,7 +89,7 @@ public class LaunchDagProcTest {
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(
new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
- "jn", DagActionStore.DagActionType.LAUNCH), null,
mock(DagActionStore.class)),
+ "jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 6a625e527..eb3429597 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -70,7 +70,6 @@ public class ReevaluateDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private DagManagementStateStore dagManagementStateStore;
private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
- private DagActionStore dagActionStore;
private DagActionReminderScheduler dagActionReminderScheduler;
@BeforeClass
@@ -83,9 +82,7 @@ public class ReevaluateDagProcTest {
public void setUp() throws Exception {
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(dagManagementStateStore);
- this.dagActionStore = mock(DagActionStore.class);
this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
- this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(this.dagActionStore);
this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
}
@@ -125,11 +122,10 @@ public class ReevaluateDagProcTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
long addSpecCount = specProducers.stream()
@@ -151,7 +147,7 @@ public class ReevaluateDagProcTest {
.filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
// when there is no more job to run in re-evaluate dag proc, it deletes
enforce_flow_finish_dag_action also
-
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
@@ -170,7 +166,6 @@ public class ReevaluateDagProcTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
doReturn(true).when(dagManagementStateStore).releaseQuota(any());
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -191,7 +186,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
// no new job to launch for this one job flow
@@ -208,7 +203,7 @@ public class ReevaluateDagProcTest {
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
-
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 993fba8c0..08fff6fe1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -53,7 +53,6 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -98,7 +97,7 @@ public class ResumeDagProcTest {
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME),
- null, mock(DagActionStore.class)));
+ null, this.dagManagementStateStore));
resumeDagProc.process(this.dagManagementStateStore);
SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));