This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2f0e53792f [GOBBLIN-2115] implement DagNodeStateStore (#3999)
2f0e53792f is described below
commit 2f0e53792fde7f852a808f73f9fc32a627fd2b71
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jul 18 14:07:25 2024 -0700
[GOBBLIN-2115] implement DagNodeStateStore (#3999)
* implement DagNodeStateStore
* addressed review comments and remove some unused code
---
.../apache/gobblin/service/ServiceConfigKeys.java | 4 +
.../apache/gobblin/metastore/MysqlStateStore.java | 2 +-
.../runtime/KafkaAvroJobStatusMonitorTest.java | 3 +-
.../modules/core/GobblinServiceGuiceModule.java | 4 +-
.../gobblin/service/modules/flowgraph/Dag.java | 3 +
.../orchestration/DagManagementStateStore.java | 72 +-----
.../orchestration/DagManagementTaskStreamImpl.java | 2 +-
.../modules/orchestration/DagManagerUtils.java | 14 --
.../modules/orchestration/DagProcessingEngine.java | 1 -
.../modules/orchestration/DagStateStore.java | 14 ++
.../orchestration/DagStateStoreWithDagNodes.java | 56 +++++
...tore.java => MySqlDagManagementStateStore.java} | 129 +++--------
.../modules/orchestration/MysqlDagActionStore.java | 43 +---
.../modules/orchestration/MysqlDagStateStore.java | 5 +-
.../MysqlDagStateStoreWithDagNodes.java | 241 +++++++++++++++++++++
.../orchestration/MysqlUserQuotaManager.java | 3 +-
.../modules/orchestration/Orchestrator.java | 2 +-
.../modules/orchestration/proc/DagProc.java | 4 +-
.../modules/orchestration/proc/DagProcUtils.java | 6 +-
.../proc/EnforceFlowFinishDeadlineDagProc.java | 1 -
.../proc/EnforceJobStartDeadlineDagProc.java | 1 -
.../modules/orchestration/proc/KillDagProc.java | 1 -
.../modules/orchestration/proc/LaunchDagProc.java | 16 +-
.../orchestration/proc/ReevaluateDagProc.java | 33 +--
.../modules/orchestration/proc/ResumeDagProc.java | 4 +-
.../modules/spec/JobExecutionPlanDagFactory.java | 7 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 2 -
.../DagManagementTaskStreamImplTest.java | 4 +-
.../orchestration/DagProcessingEngineTest.java | 6 +-
....java => MySqlDagManagementStateStoreTest.java} | 64 +++---
.../orchestration/MysqlDagActionStoreTest.java | 3 +-
.../MysqlDagStateStoreWithDagNodesTest.java | 136 ++++++++++++
.../modules/orchestration/OrchestratorTest.java | 4 +-
.../proc/EnforceDeadlineDagProcsTest.java | 51 +++--
.../orchestration/proc/KillDagProcTest.java | 12 +-
.../orchestration/proc/LaunchDagProcTest.java | 12 +-
.../orchestration/proc/ReevaluateDagProcTest.java | 41 ++--
.../orchestration/proc/ResumeDagProcTest.java | 19 +-
38 files changed, 656 insertions(+), 369 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index aeadf4dc61..c37a7f5e95 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -163,6 +163,10 @@ public class ServiceConfigKeys {
public static final int MAX_FLOW_EXECUTION_ID_LENGTH = 13; // length of
flowExecutionId which is epoch timestamp
public static final int MAX_JOB_NAME_LENGTH = 374;
public static final int MAX_JOB_GROUP_LENGTH = 374;
+ public static final int MAX_DAG_NODE_ID_LENGTH = MAX_FLOW_NAME_LENGTH +
MAX_FLOW_GROUP_LENGTH + MAX_FLOW_EXECUTION_ID_LENGTH +
+ MAX_JOB_NAME_LENGTH + MAX_JOB_GROUP_LENGTH + 4; // 4 to account for
delimiters' length
+ public static final int MAX_DAG_ID_LENGTH = MAX_FLOW_NAME_LENGTH +
MAX_FLOW_GROUP_LENGTH + MAX_FLOW_EXECUTION_ID_LENGTH
+ + 2; // 2 to account for delimiters' length
public static final String STATE_STORE_TABLE_SUFFIX = "gst";
public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index aa8d00f5e1..af7203c69b 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -202,7 +202,7 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
* @param config the properties used for datasource instantiation
* @return
*/
- static DataSource newDataSource(Config config) {
+ public static DataSource newDataSource(Config config) {
HikariDataSource dataSource = new HikariDataSource();
PasswordManager passwordManager =
PasswordManager.getInstance(ConfigUtils.configToProperties(config));
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index f03a7ff4ba..e20cfd97fb 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -75,7 +75,6 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -197,7 +196,7 @@ public class KafkaAvroJobStatusMonitorTest {
@Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
public void testProcessMessageForFailedFlow() throws IOException,
ReflectiveOperationException {
- DagManagementStateStore dagManagementStateStore =
mock(MostlyMySqlDagManagementStateStore.class);
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
//Submit GobblinTrackingEvents to Kafka
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 6365239ec4..6ed53ba00e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -77,7 +77,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -177,7 +177,7 @@ public class GobblinServiceGuiceModule implements Module {
OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
-
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class);
+
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.class);
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 50a4dcb5c2..29ffc485e0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -50,11 +50,14 @@ public class Dag<T> {
private List<DagNode<T>> nodes;
@Setter
+ @Deprecated // because this field is not persisted in mysql and contains
information in very limited cases
private String message;
@Setter
+ @Deprecated // because this field is not persisted in mysql and contains
information in very limited cases
private String flowEvent;
// Keep track of when the final flow status is emitted, in milliseconds to
avoid many duplicate events
@Setter @Getter
+ @Deprecated // because this field is not persisted in mysql and contains
information in very limited cases
private long eventEmittedTimeMillis = -1;
public Dag(List<DagNode<T>> dagNodes) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 515036c3af..139082545b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -42,6 +41,8 @@ import org.apache.gobblin.service.monitoring.JobStatus;
* An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
* and allows add/delete and other functions.
*/
+// todo - this should merge with DagStateStoreWithDagNodes interface and
`addDagNodeState` and `addDagNode` should merge and rename to
+ // `updateDagNode`
@Alpha
public interface DagManagementStateStore {
/**
@@ -57,34 +58,17 @@ public interface DagManagementStateStore {
void removeFlowSpec(URI uri, Properties headers, boolean triggerListener);
/**
- * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
- * e.g. on adding a failed dag in store to retry later, on submitting a dag
node to spec producer because that changes
- * dag node's state, on resuming a dag, on receiving a new dag from
orchestrator.
- * Calling on a previously checkpointed Dag updates it.
+ * Adds a {@link Dag} in the store.
* Opposite of this is {@link DagManagementStateStore#deleteDag} that
removes the Dag from the store.
* @param dag The dag to checkpoint
*/
- void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
-
- /**
- @return whether `dagId` is currently known due to {@link
DagManagementStateStore#checkpointDag} but not yet
- {@link DagManagementStateStore#deleteDag}
- */
- boolean containsDag(DagManager.DagId dagId) throws IOException;
+ void addDag(Dag<JobExecutionPlan> dag) throws IOException;
/**
@return the {@link Dag}, if present
*/
Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException;
- /**
- * Delete the {@link Dag} from the backing store, typically called upon
completion of execution.
- * @param dag The dag completed/cancelled execution on {@link
org.apache.gobblin.runtime.api.SpecExecutor}.
- */
- default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
- deleteDag(DagManagerUtils.generateDagId(dag));
- }
-
/**
* Delete the {@link Dag} from the backing store, typically upon completion
of execution.
* @param dagId The ID of the dag to clean up.
@@ -94,38 +78,25 @@ public interface DagManagementStateStore {
/**
* This marks the dag as a failed one.
* Failed dags are queried using {@link
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
- * @param dag failing dag
+ * @param dagId failing dag's dagId
*/
- void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
-
- /**
- * Return a list of all failed dags' IDs contained in the dag state store.
- */
- Set<String> getFailedDagIds() throws IOException;
+ void markDagFailed(DagManager.DagId dagId) throws IOException;
/**
* Returns the failed dag.
- * If the dag is not found because it was never marked as failed through
{@link DagManagementStateStore#markDagFailed(Dag)},
+ * If the dag is not found because it was never marked as failed through
+ * {@link
DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)},
* it returns Optional.absent.
* @param dagId dag id of the failed dag
*/
Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws
IOException;
- /**
- * Deletes the failed dag. No-op if dag is not found or is not marked as
failed.
- * @param dag
- * @throws IOException
- */
- default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
- deleteFailedDag(DagManagerUtils.generateDagId(dag));
- }
-
void deleteFailedDag(DagManager.DagId dagId) throws IOException;
/**
* Adds state of a {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
* Note that a DagNode is a part of a Dag and must already be present in the
store through
- * {@link DagManagementStateStore#checkpointDag}. This call is just an
additional identifier which may be used
+ * {@link DagManagementStateStore#addDag}. This call is just an additional
identifier which may be used
* for DagNode level operations. In the future, it may be merged with
checkpointDag.
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
@@ -138,7 +109,8 @@ public interface DagManagementStateStore {
* JobStatus can be non-empty only if DagNode is non-empty.
* @param dagNodeId of the dag node
*/
- Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId);
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId)
+ throws IOException;
/**
* Returns a list of {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
@@ -147,26 +119,6 @@ public interface DagManagementStateStore {
*/
Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId)
throws IOException;
- /**
- * Deletes the dag node state that was added through {@link
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
- * No-op if the dag node is not found in the store.
- * @param dagNode dag node to be deleted
- * @param dagId dag id of the dag this dag node belongs to
- */
- void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode);
-
- /**
- * Loads all currently running {@link Dag}s from the underlying store.
Typically, invoked when a new {@link DagManager}
- * takes over or on restart of service.
- * @return a {@link List} of currently running {@link Dag}s.
- */
- List<Dag<JobExecutionPlan>> getDags() throws IOException;
-
- /**
- * Initialize the dag quotas with the provided set of dags.
- */
- void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
-
/**
* Checks if the dagNode exceeds the statically configured user quota for
the proxy user, requester user and flowGroup.
* It also increases the quota usage for proxy user, requester and the
flowGroup of the given DagNode by one.
@@ -195,7 +147,7 @@ public interface DagManagementStateStore {
* Returns true if the {@link Dag} identified by the given {@link
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
* has any running job, false otherwise.
*/
- public boolean hasRunningJobs(DagManager.DagId dagId);
+ boolean hasRunningJobs(DagManager.DagId dagId) throws IOException;
/**
* Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 064e721dd9..c2507c5ebd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -140,7 +140,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
}
} catch (Exception e) {
- //TODO: need to handle exceptions gracefully
+ //TODO: add metrics
log.error("Exception getting DagAction from the queue or creating
DagTask. dagAction - {}", dagAction == null ? "<null>" : dagAction, e);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index d4b5233a9f..7323543f61 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -67,20 +67,6 @@ public class DagManagerUtils {
return getFlowId(dag.getStartNodes().get(0));
}
- public static DagActionStore.DagAction
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.DagActionType
dagActionType) {
- return createDagActionFromDagNode(dag.getStartNodes().get(0),
dagActionType);
- }
-
- // todo - verify if jobName will always be present or need default and
update FlowId, DagId etc to contain jobName
- public static DagActionStore.DagAction
createDagActionFromDagNode(DagNode<JobExecutionPlan> dagNode,
DagActionStore.DagActionType dagActionType) {
- Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
- String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
- return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
- }
-
static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 00ac9eba00..449b9cf994 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -131,7 +131,6 @@ public class DagProcessingEngine extends
AbstractIdleService {
}
DagProc<?> dagProc = dagTask.host(dagProcFactory);
try {
- // todo - add retries
dagProc.process(dagManagementStateStore);
dagTask.conclude();
} catch (Exception e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 09824ddcb9..7fa1a2f219 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -47,28 +47,42 @@ public interface DagStateStore {
*/
void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
+ default boolean cleanUp(DagManager.DagId dagId) throws IOException {
+ cleanUp(dagId.toString());
+ return true;
+ }
+
/**
* Delete the {@link Dag} from the backing store, typically upon completion
of execution.
* @param dagId The ID of the dag to clean up.
*/
+ @Deprecated
void cleanUp(String dagId) throws IOException;
/**
* Load all currently running {@link Dag}s from the underlying store.
Typically, invoked when a new {@link DagManager}
* takes over or on restart of service.
+ * @deprecated because {@link DagProcessingEngine} that will replace {@link
DagManager} does not need this API
* @return a {@link List} of currently running {@link Dag}s.
*/
+ @Deprecated
List<Dag<JobExecutionPlan>> getDags() throws IOException;
/**
* Return a single dag from the dag state store.
* @param dagId The ID of the dag to load.
*/
+ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws
IOException {
+ return getDag(dagId.toString());
+ }
+
+ @Deprecated
Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
/**
* Return a list of all dag IDs contained in the dag state store.
*/
+ @Deprecated
Set<String> getDagIds() throws IOException;
default boolean existsDag(DagManager.DagId dagId) throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
new file mode 100644
index 0000000000..03aaf41520
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface for storing and retrieving currently running {@link
Dag.DagNode<JobExecutionPlan>}s.
+ * Callers should use {@link DagStateStore#writeCheckpoint} to store dags.
After that, to update individual
+ * {@link Dag.DagNode}s, {@link DagStateStoreWithDagNodes#updateDagNode}
should be used.
+ * {@link DagStateStore#cleanUp(DagManager.DagId)} should be used to delete
all the {@link Dag.DagNode}s for a {@link Dag}.
+ */
+public interface DagStateStoreWithDagNodes extends DagStateStore {
+
+ /**
+ * Updates a dag node identified by the provided {@link DagManager.DagId}
+ * with the given {@link Dag.DagNode}.
+ * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and
0 if new dag node is same as the existing one
+ * <a
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
+ */
+ int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException;
+
+ /**
+ * Returns all the {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
+ * {@link org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
+ */
+ Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId)
throws IOException;
+
+ /**
+ * Return the {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for the given {@link
DagNodeId} or empty
+ * optional if it is not present
+ */
+ Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId)
throws IOException;
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
similarity index 65%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index d0c68a3fd9..285700b845 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -20,14 +20,11 @@ import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -46,6 +43,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -62,13 +60,11 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
*/
@Slf4j
@Singleton
-public class MostlyMySqlDagManagementStateStore implements
DagManagementStateStore {
- private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new
ConcurrentHashMap<>();
- // dagToJobs holds a map of dagId to running jobs of that dag
- private final Map<DagManager.DagId, Set<Dag.DagNode<JobExecutionPlan>>>
dagToJobs = new ConcurrentHashMap<>();
- private DagStateStore dagStateStore;
- private DagStateStore failedDagStateStore;
- private JobStatusRetriever jobStatusRetriever;
+public class MySqlDagManagementStateStore implements DagManagementStateStore {
+ // todo - these two stores should merge
+ private DagStateStoreWithDagNodes dagStateStore;
+ private DagStateStoreWithDagNodes failedDagStateStore;
+ private final JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
Map<URI, TopologySpec> topologySpecMap;
@@ -81,7 +77,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
private final DagActionStore dagActionStore;
@Inject
- public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog, UserQuotaManager userQuotaManager,
+ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog,
UserQuotaManager userQuotaManager,
JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
this.quotaManager = userQuotaManager;
this.config = config;
@@ -116,109 +112,69 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
this.flowCatalog.remove(uri, headers, triggerListener);
}
- public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) throws IOException {
+ public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
start();
}
- private DagStateStore createDagStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) {
+ private DagStateStoreWithDagNodes createDagStateStore(Config config,
Map<URI, TopologySpec> topologySpecMap) {
try {
- Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStore.class.getName()));
- return (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+ Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreWithDagNodes.class.getName()));
+ return (DagStateStoreWithDagNodes)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
@Override
- public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
this.dagStateStore.writeCheckpoint(dag);
}
@Override
- public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
- this.dagStateStore.cleanUp(dag);
- // todo - updated failedDagStateStore iff cleanup returned 1
+ public void markDagFailed(DagManager.DagId dagId) throws IOException {
+ Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
this.failedDagStateStore.writeCheckpoint(dag);
- log.info("Marked dag failed {}", DagManagerUtils.generateDagId(dag));
- }
-
- @Override
- public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
- this.dagStateStore.cleanUp(dag);
- }
-
- @Override
- public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
- this.failedDagStateStore.cleanUp(dag);
+ this.dagStateStore.cleanUp(dagId);
+ // todo - updated failedDagStateStore iff cleanup returned 1
+ // or merge dagStateStore and failedDagStateStore and change the flag that
marks a dag `failed`
+ log.info("Marked dag failed {}", dagId);
}
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
- this.dagStateStore.cleanUp(dagId.toString());
+ this.dagStateStore.cleanUp(dagId);
log.info("Deleted dag {}", dagId);
}
@Override
public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
- this.failedDagStateStore.cleanUp(dagId.toString());
- }
-
- @Override
- public List<Dag<JobExecutionPlan>> getDags() throws IOException {
- return this.dagStateStore.getDags();
+ this.failedDagStateStore.cleanUp(dagId);
+ log.info("Deleted failed dag {}", dagId);
}
@Override
public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId)
throws IOException {
- return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+ return Optional.of(this.failedDagStateStore.getDag(dagId));
}
- @Override
- public Set<String> getFailedDagIds() throws IOException {
- return this.failedDagStateStore.getDagIds();
- }
-
- @Override
- // todo - updating different maps here and in addDagNodeState can result in
inconsistency between the maps
- public synchronized void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
- this.dagNodes.remove(dagNode.getValue().getId());
- if (this.dagToJobs.containsKey(dagId)) {
- this.dagToJobs.get(dagId).remove(dagNode);
- if (this.dagToJobs.get(dagId).isEmpty()) {
- this.dagToJobs.remove(dagId);
- }
- }
- }
-
- // todo - updating different mapps here and in deleteDagNodeState can result
in inconsistency between the maps
@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan>
dagNode, DagManager.DagId dagId)
throws IOException {
- if (!containsDag(dagId)) {
- throw new RuntimeException("Dag " + dagId + " not found");
- }
- this.dagNodes.put(dagNode.getValue().getId(), dagNode);
- if (!this.dagToJobs.containsKey(dagId)) {
- this.dagToJobs.put(dagId, new HashSet<>());
- }
- this.dagToJobs.get(dagId).add(dagNode);
+ this.dagStateStore.updateDagNode(dagId, dagNode);
}
@Override
public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException {
- return Optional.ofNullable(this.dagStateStore.getDag(dagId.toString()));
- }
-
- @Override
- public boolean containsDag(DagManager.DagId dagId) throws IOException {
- return this.dagStateStore.existsDag(dagId);
+ return Optional.ofNullable(this.dagStateStore.getDag(dagId));
}
@Override
- public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
- if (this.dagNodes.containsKey(dagNodeId)) {
- return ImmutablePair.of(Optional.of(this.dagNodes.get(dagNodeId)),
getJobStatus(dagNodeId));
+ public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId)
+ throws IOException {
+ Optional<Dag.DagNode<JobExecutionPlan>> dagNode =
this.dagStateStore.getDagNode(dagNodeId);
+ if (dagNode.isPresent()) {
+ return ImmutablePair.of(dagNode, getJobStatus(dagNodeId));
} else {
// no point of searching for status if the node itself is absent.
return ImmutablePair.of(Optional.empty(), Optional.empty());
@@ -226,18 +182,8 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
}
@Override
- public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
dagId) {
- Set<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
- if (dagNodes != null) {
- return dagNodes;
- } else {
- return new HashSet<>();
- }
- }
-
- public void initQuota(Collection<Dag<JobExecutionPlan>> dags) {
- // This implementation does not need to update quota usage when the
service restarts or when its leadership status changes
- // because quota usage are persisted in mysql table
+ public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
dagId) throws IOException {
+ return this.dagStateStore.getDagNodes(dagId);
}
@Override
@@ -264,16 +210,11 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
}
}
- /* todo - this method works because when the jobs finish they are deleted
from the DMSS -> if no more job is found, means
- no more running jobs.
- But DMSS still has dags and which still contains dag nodes. We need to
revisit this method's logic when we change
- DMSS to a fully mysql backed implementation. then we may want to consider
this approach
- return getDagNodes(dagId).stream()
- .anyMatch(node ->
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
- */
+
@Override
- public boolean hasRunningJobs(DagManager.DagId dagId) {
- return !getDagNodes(dagId).isEmpty();
+ public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
+ return getDagNodes(dagId).stream()
+ .anyMatch(node ->
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 5ba32539fe..c96478c79d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -38,7 +38,6 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DBStatementExecutor;
-import org.apache.gobblin.util.ExponentialBackoff;
@Slf4j
@@ -49,14 +48,11 @@ public class MysqlDagActionStore implements DagActionStore {
protected final DataSource dataSource;
private final DBStatementExecutor dbStatementExecutor;
private final String tableName;
- private final long retentionPeriodSeconds;
- private String thisTableRetentionStatement;
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND
job_name = ? AND dag_action = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s
(flow_group, flow_name, flow_execution_id, job_name, dag_action) "
+ "VALUES (?, ?, ?, ?, ?)";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND
dag_action = ?";
- private static final String GET_STATEMENT = "SELECT flow_group, flow_name,
flow_execution_id, job_name, dag_action FROM %s WHERE flow_group = ? AND
flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?";
private static final String GET_ALL_STATEMENT = "SELECT flow_group,
flow_name, flow_execution_id, job_name, dag_action FROM %s";
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %s (" +
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, "
@@ -64,11 +60,10 @@ public class MysqlDagActionStore implements DagActionStore {
+ "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT
NULL, "
+ "dag_action varchar(50) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,job_name,dag_action))";
+
// Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
- private final int getDagActionMaxRetries;
-
@Inject
public MysqlDagActionStore(Config config) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
@@ -78,9 +73,9 @@ public class MysqlDagActionStore implements DagActionStore {
}
this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
- this.getDagActionMaxRetries = ConfigUtils.getInt(config,
ConfigurationKeys.MYSQL_GET_MAX_RETRIES,
ConfigurationKeys.DEFAULT_MYSQL_GET_MAX_RETRIES);
- this.retentionPeriodSeconds = ConfigUtils.getLong(config,
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
-
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
+ long retentionPeriodSeconds =
+ ConfigUtils.getLong(config,
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
+
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
try (Connection connection = dataSource.getConnection();
@@ -91,9 +86,9 @@ public class MysqlDagActionStore implements DagActionStore {
throw new IOException("Failure creation table " + tableName, e);
}
this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
- this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ String thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
// Periodically deletes all rows in the table last_modified before the
retention period defined by config.
-
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6L, TimeUnit.HOURS);
}
@Override
@@ -141,32 +136,6 @@ public class MysqlDagActionStore implements DagActionStore
{
}}, true);
}
- // TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
- private DagAction getDagActionWithRetry(String flowGroup, String flowName,
long flowExecutionId, String jobName, DagActionType dagActionType,
ExponentialBackoff exponentialBackoff)
- throws IOException, SQLException {
- return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, String.valueOf(flowExecutionId));
- getStatement.setString(++i, dagActionType.toString());
- try (ResultSet rs = getStatement.executeQuery()) {
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
- } else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
jobName, dagActionType, exponentialBackoff);
- } else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- dagActionType, flowGroup, flowName, flowExecutionId));
- return null;
- }
- } catch (SQLException | InterruptedException e) {
- throw new IOException(String.format("Failure get %s from table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType), tableName), e);
- }
- }, true);
- }
-
@Override
public Collection<DagAction> getDagActions() throws IOException {
return
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT,
tableName), getAllStatement -> {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 0879704468..129aff2bef 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -61,14 +61,15 @@ import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.g
* It implements interfaces of {@link DagStateStore} but delegating
responsibilities to methods provided
* in {@link MysqlStateStore}.
* It also implements conversion between {@link Dag<JobExecutionPlan>} to
{@link State}.
- *
+ * <p>
* The schema of this will simply be:
* | storeName | tableName | State |
* where storeName represents FlowId, a combination of FlowGroup and FlowName,
and tableName represents FlowExecutionId.
* State is a pocket for serialized {@link Dag} object.
*
- *
+ * Deprecated in favor of {@link MysqlDagStateStoreWithDagNodes}
*/
+@Deprecated
public class MysqlDagStateStore implements DagStateStore {
public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX +
"mysqlDagStateStore";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
new file mode 100644
index 0000000000..2692e20697
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.DBStatementExecutor;
+
+import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
+
+
+/**
+ * An implementation of {@link DagStateStoreWithDagNodes} using MySQL as a
backup.
+ */
+@Slf4j
+public class MysqlDagStateStoreWithDagNodes implements
DagStateStoreWithDagNodes {
+
+ public static final String CONFIG_PREFIX = MysqlDagStateStore.CONFIG_PREFIX;
+ protected final DBStatementExecutor dbStatementExecutor;
+ protected final String tableName;
+ protected final GsonSerDe<List<JobExecutionPlan>> serDe;
+ private final JobExecutionPlanDagFactory jobExecPlanDagFactory;
+
+ // todo add a column that tells if it is a running dag or a failed dag
+ protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %s ("
+ + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ")
CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ")
NOT NULL, "
+ + "dag_node JSON NOT NULL, "
+ + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP, "
+ + "PRIMARY KEY (dag_node_id), "
+ + "UNIQUE INDEX dag_node_index (dag_node_id), "
+ + "INDEX dag_index (parent_dag_id))";
+
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s
(dag_node_id, parent_dag_id, dag_node) "
+ + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node =
new.dag_node";
+ protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node
FROM %s WHERE parent_dag_id = ?";
+ protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM
%s WHERE dag_node_id = ?";
+ protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE
parent_dag_id = ?";
+ private final ContextAwareCounter totalDagCount;
+
+ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec>
topologySpecMap) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ }
+
+ String DEFAULT_TABLE_NAME = "dag_node_state_store";
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, DEFAULT_TABLE_NAME);
+ // create table if it does not exist
+ DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
+ }
+ this.dbStatementExecutor = new DBStatementExecutor(dataSource, log);
+
+ JsonSerializer<List<JobExecutionPlan>> serializer = new
JobExecutionPlanListSerializer();
+ JsonDeserializer<List<JobExecutionPlan>> deserializer = new
JobExecutionPlanListDeserializer(topologySpecMap);
+ Type typeToken = new TypeToken<List<JobExecutionPlan>>() {
+ }.getType();
+ this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+ this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+ MetricContext metricContext =
+ Instrumented.getMetricContext(new
State(ConfigUtils.configToProperties(config)), this.getClass());
+ this.totalDagCount =
metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
+ log.info("Instantiated {}", getClass().getSimpleName());
+ }
+
+ @Override
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag)
+ throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+ boolean newDag = false;
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ if (updateDagNode(dagId, dagNode) == 1) {
+ newDag = true;
+ }
+ }
+ if (newDag) {
+ this.totalDagCount.inc();
+ }
+ }
+
+ @Override
+ public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {
+ cleanUp(generateDagId(dag));
+ }
+
+ @Override
+ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
+
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT,
tableName), deleteStatement -> {
+ try {
+ deleteStatement.setString(1, dagId.toString());
+ return deleteStatement.executeUpdate() != 0;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure deleting dag for %s",
dagId), e);
+ }}, true);
+ this.totalDagCount.dec();
+ return true;
+ }
+
+ @Override
+ public void cleanUp(String dagId) throws IOException {
+ throw new NotSupportedException(getClass().getSimpleName() + " does not
need this legacy API that originated with "
+ + "the DagManager that is replaced by DagProcessingEngine");
+ }
+
+ @Override
+ public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+ throw new NotSupportedException(getClass().getSimpleName() + " does not
need this legacy API that originated with "
+ + "the DagManager that is replaced by DagProcessingEngine"); }
+
+ @Override
+ public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws
IOException {
+ return convertDagNodesIntoDag(getDagNodes(dagId));
+ }
+
+ @Override
+ public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+ throw new NotSupportedException(getClass().getSimpleName() + " does not
need this API");
+ }
+
+ @Override
+ public Set<String> getDagIds() throws IOException {
+ throw new NotSupportedException(getClass().getSimpleName() + " does not
need this API");
+ }
+
+ /**
+ * Get the {@link Dag} out of a {@link State} pocket.
+ */
+ private Dag<JobExecutionPlan>
convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutionPlan>> dagNodes) {
+ if (dagNodes.isEmpty()) {
+ return null;
+ }
+ return
jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));
+ }
+
+ @Override
+ public int updateDagNode(DagManager.DagId parentDagId,
Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ String dagNodeId = dagNode.getValue().getId().toString();
+ return
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
+ insertStatement.setString(1, dagNodeId);
+ insertStatement.setString(2, parentDagId.toString());
+ insertStatement.setString(3,
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+ return insertStatement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure adding dag node for %s",
dagNodeId), e);
+ }}, true);
+ }
+
+ @Override
+ public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
parentDagId) throws IOException {
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT,
tableName), getStatement -> {
+ getStatement.setString(1, parentDagId.toString());
+ HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
+ try (ResultSet rs = getStatement.executeQuery()) {
+ while (rs.next()) {
+ dagNodes.add(new
Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
+ }
+ return dagNodes;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag nodes for dag
%s", parentDagId), e);
+ }
+ }, true);
+ }
+
+ @Override
+ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId
dagNodeId) throws IOException {
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODE_STATEMENT,
tableName), getStatement -> {
+ getStatement.setString(1, dagNodeId.toString());
+ try (ResultSet rs = getStatement.executeQuery()) {
+ if (rs.next()) {
+ return Optional.of(new
Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
+ }
+ return Optional.empty();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag node for %s",
dagNodeId), e);
+ }
+ }, true);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index c4e3b87137..af67481bc7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -324,7 +324,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(name VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+ "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
- try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
createStatement.executeUpdate();
} catch (SQLException e) {
// TODO: revisit use of connection test query following verification
of successful connection pool migration:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 04e12c13ed..e917c53611 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -125,7 +125,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
//At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
if (dagManagementStateStore.isPresent()) {
- ((MostlyMySqlDagManagementStateStore)
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+ ((MySqlDagManagementStateStore)
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
}
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index fece28d8c3..effe61c360 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -64,8 +64,8 @@ public abstract class DagProc<T> {
}
public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
- T state = initialize(dagManagementStateStore); // todo - retry
- act(dagManagementStateStore, state); // todo - retry
+ T state = initialize(dagManagementStateStore);
+ act(dagManagementStateStore, state);
log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 808b878589..1f4c504ed9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -70,7 +70,6 @@ public class DagProcUtils {
if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
dagId);
- log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), dagId);
} else {
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
@@ -153,6 +152,7 @@ public class DagProcUtils {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
Properties props = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+
if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
@@ -169,8 +169,8 @@ public class DagProcUtils {
dagNodeToCancel.getValue().getJobSpec().getUri());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
- // todo - why was it not being cleaned up in DagManager?
- dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+ // add back the dag node with updated states in the store
+ dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index a7d388b9b3..e5aa4125dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -58,7 +58,6 @@ public class EnforceFlowFinishDeadlineDagProc extends
DeadlineEnforcementDagProc
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed due to exceeding SLA of " +
flowFinishDeadline + " ms");
- dagManagementStateStore.checkpointDag(dag);
} else {
log.error("EnforceFlowFinishDeadline dagAction received before due time.
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 15fa11c7e4..5eb2236f8f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -77,7 +77,6 @@ public class EnforceJobStartDeadlineDagProc extends
DeadlineEnforcementDagProc {
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " +
timeOutForJobStart + " ms after orchestration");
- dagManagementStateStore.checkpointDag(dag);
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index 15448d62b8..ab68d3e7b1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -62,7 +62,6 @@ public class KillDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
dag.get().setMessage("Flow killed by request");
- dagManagementStateStore.checkpointDag(dag.get());
if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index c2d840ef70..88f3249f1c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -20,17 +20,14 @@ package
org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -42,14 +39,6 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@Slf4j
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
- // todo - this is not orchestration delay and should be renamed. keeping it
the same because DagManager is also using
- // the same name
- private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
-
- static {
- metricContext.register(
-
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
- }
public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
super(launchDagTask);
@@ -68,7 +57,7 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
getDagId().getFlowExecutionId());
Optional<Dag<JobExecutionPlan>> dag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
if (dag.isPresent()) {
- dagManagementStateStore.checkpointDag(dag.get());
+ dagManagementStateStore.addDag(dag.get());
}
return dag;
} catch (URISyntaxException | SpecNotFoundException | InterruptedException
| IOException e) {
@@ -84,10 +73,7 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// todo - add metrics
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(),
getDagId());
- // Checkpoint the dag state, it should have an updated value of dag nodes
- dagManagementStateStore.checkpointDag(dag.get());
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
- orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index a57de48189..27525c057c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -76,11 +75,9 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
return;
}
- Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
ExecutionStatus executionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
- // pass dag, so that dag is updated too, updated information will be
required in onJobFinish in finding next jobs to submit
- setStatus(dagManagementStateStore, dag, getDagNodeId(), executionStatus);
+ updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
@@ -91,7 +88,9 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
}
- onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
+ // get the dag after updating dag node status
+ Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ onJobFinish(dagManagementStateStore, dagNode, dag);
if (jobStatus.isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
@@ -114,7 +113,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
} else {
- dagManagementStateStore.markDagFailed(dag);
+ dagManagementStateStore.markDagFailed(getDagId());
}
DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagId());
@@ -125,17 +124,10 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
* Sets status of a dag node inside the given Dag.
* todo - DMSS should support this functionality like an atomic get-and-set
operation.
*/
- private void setStatus(DagManagementStateStore dagManagementStateStore,
- Dag<JobExecutionPlan> dag, DagNodeId dagNodeId, ExecutionStatus
executionStatus) throws IOException {
- for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
- if (node.getValue().getId().equals(dagNodeId)) {
- node.getValue().setExecutionStatus(executionStatus);
- dagManagementStateStore.addDagNodeState(node, getDagId());
- dagManagementStateStore.checkpointDag(dag);
- return;
- }
- }
- log.error("DagNode with id {} not found in Dag {}", dagNodeId, getDagId());
+ private void updateDagNodeStatus(DagManagementStateStore
dagManagementStateStore,
+ Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus)
throws IOException {
+ dagNode.getValue().setExecutionStatus(executionStatus);
+ dagManagementStateStore.addDagNodeState(dagNode, getDagId());
}
/**
@@ -143,8 +135,9 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
* This method updates the state of the dag and performs clean up actions as
necessary.
*/
private void onJobFinish(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode,
- ExecutionStatus executionStatus, Dag<JobExecutionPlan> dag) throws
IOException {
+ Dag<JobExecutionPlan> dag) throws IOException {
String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
+ ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
log.info("Job {} of Dag {} has finished with status {}", jobName,
getDagId(), executionStatus.name());
// Only decrement counters and quota for jobs that actually ran on the
executor, not from a GaaS side failure/skip event
if (dagManagementStateStore.releaseQuota(dagNode)) {
@@ -167,9 +160,5 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
default:
log.warn("It should not reach here. Job status {} is unexpected.",
executionStatus);
}
-
- // Checkpoint the dag state, it should have an updated value of dag fields
- dagManagementStateStore.checkpointDag(dag);
- dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 68e02494eb..3447035cc4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -87,10 +87,10 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// these two statements effectively move the dag from failed dag store to
(running) dag store.
// to prevent loss in the unlikely event of failure between the two, we
add first.
- dagManagementStateStore.checkpointDag(failedDag.get());
+ dagManagementStateStore.addDag(failedDag.get());
// if it fails here, it will check point the failed dag in the (running)
dag store again, which is idempotent
- dagManagementStateStore.deleteFailedDag(failedDag.get());
+ dagManagementStateStore.deleteFailedDag(getDagId());
DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(),
getDagId());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index d96c67619c..02973ddb91 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -30,7 +30,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
@@ -48,7 +47,7 @@ public class JobExecutionPlanDagFactory {
Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap =
Maps.newHashMapWithExpectedSize(jobExecutionPlans.size());
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new
ArrayList<>(jobExecutionPlans.size());
- /**
+ /*
* Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link
JobSpec} in the flow. Add this node
* to a HashMap.
*/
@@ -61,12 +60,10 @@ public class JobExecutionPlanDagFactory {
}
}
- /**
+ /*
* Iterate over each {@link JobSpec} to get the dependencies of each
{@link JobSpec}.
* For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and
* set the {@link Dag.DagNode}s corresponding to its dependencies as its
parent nodes.
- *
- * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
*/
List<String> jobNames = new ArrayList<>();
for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d2800fbe36..9a9aac1c60 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -143,7 +143,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
// log exceptions to expose errors we suffer under and/or guide
intervention when resolution not readily forthcoming
- // todo - this retryer retries all the exceptions. we should make it retry
only really transient
this.persistJobStatusRetryer =
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
@Override
@@ -234,7 +233,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
if (this.dagProcEngineEnabled &&
DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
- // todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
try {
this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} catch (Exception e) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index f4d06e607a..5ce3b33789 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -57,14 +57,14 @@ public class DagManagementTaskStreamImplTest {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
this.testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
Config config = configBuilder.build();
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 14dd338f64..06982c5f18 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -54,7 +54,7 @@ public class DagProcessingEngineTest {
private DagManagementTaskStreamImpl dagManagementTaskStream;
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
- private static MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private static MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testMetastoreDatabase;
static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@@ -69,14 +69,14 @@ public class DagProcessingEngineTest {
Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
config = configBuilder.build();
- dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+ dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
doReturn(true).when(dagActionStore).deleteDagAction(any());
dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
similarity index 74%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index 597acadb1a..3185943902 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.service.modules.orchestration;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,16 +29,15 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
@@ -44,6 +45,7 @@ import
org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.CompletedFuture;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -54,13 +56,12 @@ import static org.mockito.Mockito.mock;
/**
* Mainly testing functionalities related to DagStateStore but not
Mysql-related components.
*/
-public class MostlyMySqlDagManagementStateStoreTest {
+public class MySqlDagManagementStateStoreTest {
private ITestMetastoreDatabase testDb;
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private MySqlDagManagementStateStore dagManagementStateStore;
private static final String TEST_USER = "testUser";
- private static final String TEST_PASSWORD = "testPassword";
- private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+ public static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "table";
public static String TEST_SPEC_EXECUTOR_URI = "mySpecExecutor";
@@ -79,6 +80,18 @@ public class MostlyMySqlDagManagementStateStoreTest {
}
}
+ public static <T> boolean compareLists(List<T> list1, List<T> list2) {
+ if (list1.size() != list2.size()) {
+ return false;
+ }
+ for (T item : list1) {
+ if (Collections.frequency(list1, item) != Collections.frequency(list2,
item)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Test
public void testAddDag() throws Exception {
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
@@ -90,14 +103,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
DagNodeId dagNodeId =
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
- this.dagManagementStateStore.checkpointDag(dag);
- this.dagManagementStateStore.checkpointDag(dag2);
+ this.dagManagementStateStore.addDag(dag);
+ this.dagManagementStateStore.addDag(dag2);
this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
- Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
- Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getDag(dagId).get().toString());
+ Assert.assertTrue(compareLists(dag.getNodes(),
this.dagManagementStateStore.getDag(dagId).get().getNodes()));
Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
Set<Dag.DagNode<JobExecutionPlan>> dagNodes =
this.dagManagementStateStore.getDagNodes(dagId);
@@ -105,26 +117,26 @@ public class MostlyMySqlDagManagementStateStoreTest {
Assert.assertTrue(dagNodes.contains(dagNode));
Assert.assertTrue(dagNodes.contains(dagNode2));
- this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode);
-
Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode));
-
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode2));
-
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId2).contains(dagNode3));
-
// test to verify that adding a new dag node with the same dag node id
(defined by the jobSpec) replaces the existing one
-
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 1);
- JobExecutionPlan duplicateJobExecutionPlan = new
JobExecutionPlan(dagNode2.getValue().getJobSpec(),
- new MockedSpecExecutor(ConfigFactory.empty()));
- Dag.DagNode<JobExecutionPlan> duplicateDagNode = new
Dag.DagNode<>(duplicateJobExecutionPlan);
+ JobExecutionPlan jobExecutionPlan = new
JobExecutionPlan(dagNode2.getValue().getJobSpec(),
+
DagTestUtils.buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor());
+ jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+ // Future of type CompletedFuture is used because in tests
InMemorySpecProducer is used and that responds with CompletedFuture
+ CompletedFuture<Boolean> future = new CompletedFuture<>(Boolean.TRUE,
null);
+ jobExecutionPlan.setJobFuture(Optional.of(future));
+
+ Dag.DagNode<JobExecutionPlan> duplicateDagNode = new
Dag.DagNode<>(jobExecutionPlan);
this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
-
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 1);
+
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
}
- public static MostlyMySqlDagManagementStateStore
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
+ public static MySqlDagManagementStateStore
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreWithDagNodes.class.getName())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testMetastoreDatabase.getJdbcUrl())
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE +
1)
-
.addPrimitive(MostlyMySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag" + 1)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+ .addPrimitive(MySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE +
2);
Config config = configBuilder.build();
JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
@@ -139,8 +151,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever,
+ MySqlDagManagementStateStore dagManagementStateStore =
+ new MySqlDagManagementStateStore(config, null, null,
jobStatusRetriever,
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 5130019536..6137f91d10 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
@@ -103,7 +102,7 @@ public class MysqlDagActionStoreTest {
}
@Test(dependsOnMethods = "testGetActions")
- public void testDeleteAction() throws IOException, SQLException {
+ public void testDeleteAction() throws IOException {
this.mysqlDagActionStore.deleteDagAction(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL));
Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
new file mode 100644
index 0000000000..28687771f7
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not
Mysql-related components.
+ */
+public class MysqlDagStateStoreWithDagNodesTest {
+
+ private DagStateStore dagStateStore;
+
+ private static final String TEST_USER = "testUser";
+ private static ITestMetastoreDatabase testDb;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ testDb = TestMetastoreDatabaseFactory.get();
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY,
MySqlDagManagementStateStoreTest.TEST_PASSWORD);
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ this.dagStateStore = new
MysqlDagStateStoreWithDagNodes(configBuilder.build(), topologySpecMap);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ testDb.close();
+ }
+ }
+
+ @Test
+ public void testAddGetAndDeleteDag() throws Exception{
+ Dag<JobExecutionPlan> originalDag1 = DagTestUtils.buildDag("random_1",
123L);
+ Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2",
456L);
+ DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
+ DagManager.DagId dagId2 = DagManagerUtils.generateDagId(originalDag2);
+ this.dagStateStore.writeCheckpoint(originalDag1);
+ this.dagStateStore.writeCheckpoint(originalDag2);
+
+ // Verify get one dag
+ Dag<JobExecutionPlan> dag1 = this.dagStateStore.getDag(dagId1);
+ Dag<JobExecutionPlan> dag2 = this.dagStateStore.getDag(dagId2);
+
Assert.assertTrue(MySqlDagManagementStateStoreTest.compareLists(dag1.getNodes(),
originalDag1.getNodes()));
+
Assert.assertTrue(MySqlDagManagementStateStoreTest.compareLists(dag2.getNodes(),
originalDag2.getNodes()));
+
+ // Verify dag contents
+ Dag<JobExecutionPlan> dagDeserialized = dag1;
+ Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+ Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+ Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+ Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> parent =
dagDeserialized.getStartNodes().get(0);
+ Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+ for (int i = 0; i < 2; i++) {
+ JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+ Config jobConfig = plan.getJobSpec().getConfig();
+
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY),
"group" + "random_1");
+
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY),
"flow" + "random_1");
+
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
123L);
+ Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+ }
+
+ dagDeserialized = dag2;
+ Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+ Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+ Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+ child = dagDeserialized.getEndNodes().get(0);
+ parent = dagDeserialized.getStartNodes().get(0);
+ Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+ for (int i = 0; i < 2; i++) {
+ JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+ Config jobConfig = plan.getJobSpec().getConfig();
+
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY),
"group" + "random_2");
+
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY),
"flow" + "random_2");
+
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
456L);
+ Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+ }
+
+ dagStateStore.cleanUp(dagId1);
+ dagStateStore.cleanUp(dagId2);
+
+ Assert.assertNull(this.dagStateStore.getDag(dagId1));
+ Assert.assertNull(this.dagStateStore.getDag(dagId2));
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 961eddaa2b..7c5880e824 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -136,8 +136,8 @@ public class OrchestratorTest {
this.mockDagManager = mock(DagManager.class);
Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
-
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ MySqlDagManagementStateStore dagManagementStateStore =
+
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 41bdf33341..ac4cd3c49f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.service.modules.orchestration.proc;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -34,14 +34,16 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -77,7 +79,7 @@ public class EnforceDeadlineDagProcsTest {
String flowGroup = "fg";
String flowName = "fn";
long flowExecutionId = System.currentTimeMillis();
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
@@ -88,12 +90,15 @@ public class EnforceDeadlineDagProcsTest {
.withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ dagManagementStateStore.addDag(dag); // simulate having a dag that has
not yet started running
dagManagementStateStore.addDagAction(dagAction);
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
@@ -102,9 +107,9 @@ public class EnforceDeadlineDagProcsTest {
enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
- Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
-
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).cancelJob(any(),
any());
+ specProducers.stream().skip(expectedNumOfDeleteDagNodeStates) //
separately verified `specProducers.get(0)`
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(),
any()));
}
/*
@@ -117,7 +122,7 @@ public class EnforceDeadlineDagProcsTest {
String flowGroup = "fg";
String flowName = "fn";
long flowExecutionId = System.currentTimeMillis();
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
@@ -128,22 +133,23 @@ public class EnforceDeadlineDagProcsTest {
.withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ dagManagementStateStore.addDag(dag); // simulate having a dag that has
not yet started running
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
- int expectedNumOfDeleteDagNodeStates = 0; // no dag node because we
simulated (by not adding) missing dag action
- Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
-
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ // no job cancelled because we simulated (by not adding) missing dag action
+ specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).cancelJob(any(), any()));
}
/*
@@ -155,7 +161,7 @@ public class EnforceDeadlineDagProcsTest {
String flowGroup = "fg";
String flowName = "fn";
long flowExecutionId = System.currentTimeMillis();
- MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
@@ -167,12 +173,15 @@ public class EnforceDeadlineDagProcsTest {
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
is in running state
+
+ dagManagementStateStore.addDag(dag); // simulate having a dag that is in
running state
dagManagementStateStore.addDagAction(dagAction);
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
@@ -180,8 +189,6 @@ public class EnforceDeadlineDagProcsTest {
"job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
dagManagementStateStore));
enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
- Assert.assertEquals(numOfDagNodes,
-
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.times(1)).cancelJob(any(), any()));
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 64efb2ed56..dbe17d3362 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -45,8 +45,8 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -61,13 +61,13 @@ import static org.mockito.Mockito.spy;
public class KillDagProcTest {
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
- this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
+ this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@@ -86,7 +86,7 @@ public class KillDagProcTest {
5, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg"))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
@@ -127,7 +127,7 @@ public class KillDagProcTest {
5, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg"))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
JobStatus
jobStatus =
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6b5e7bbdd2..1b8c7da4ca 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -51,8 +51,8 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -70,12 +70,12 @@ import static org.mockito.Mockito.spy;
public class LaunchDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private MySqlDagManagementStateStore dagManagementStateStore;
@BeforeClass
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@@ -95,7 +95,7 @@ public class LaunchDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
@@ -126,7 +126,7 @@ public class LaunchDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index bbf5874db4..93f31d1410 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -47,7 +47,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
@@ -74,7 +74,7 @@ public class ReevaluateDagProcTest {
@BeforeMethod
public void setUp() throws Exception {
- this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
}
@@ -95,13 +95,13 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
.flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
- dagManagementStateStore.checkpointDag(dag);
+ dagManagementStateStore.addDag(dag);
/*
We cannot check the spec producers if addSpec is called on them because
spec producer object changes in writing/reading
@@ -116,33 +116,33 @@ public class ReevaluateDagProcTest {
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
-
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
// there are two invocations, one after setting status and other after
sending new job to specProducer
Mockito.verify(this.dagManagementStateStore,
Mockito.times(2)).addDagNodeState(any(), any());
- // current job's state is deleted
-
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+ // assert that the first job is completed
+ Assert.assertEquals(ExecutionStatus.COMPLETE,
+
this.dagManagementStateStore.getDag(dagId).get().getStartNodes().get(0).getValue().getExecutionStatus());
}
// test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
@Test
public void testNoNextJobToRun() throws Exception {
String flowName = "fn2";
+ DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName,
flowExecutionId);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
1, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
.flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
- dagManagementStateStore.checkpointDag(dag);
+ dagManagementStateStore.addDag(dag);
Dag<JobExecutionPlan> mockedDag = DagManagerTest.buildDag("2",
flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
1, "user5", ConfigFactory.empty()
@@ -150,13 +150,15 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
// mock getDagNodeWithJobStatus() to return a dagNode with status completed
mockedDag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));
+
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
ReevaluateDagProc
@@ -167,31 +169,30 @@ public class ReevaluateDagProcTest {
// no new job to launch for this one job flow
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
- // current job's state is deleted
-
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
-
// dag is deleted because the only job in the dag is completed
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
+
+ Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
@Test
public void testCurrentJobToRun() throws Exception {
String flowName = "fn3";
+ DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName,
flowExecutionId);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
2, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
- dagManagementStateStore.checkpointDag(dag);
+ dagManagementStateStore.addDag(dag);
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.empty()))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
@@ -208,8 +209,6 @@ public class ReevaluateDagProcTest {
specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
- // no job's state is deleted because that happens when the job finishes
triggered the reevaluate dag proc
- Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagNodeState(any(), any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
@@ -224,12 +223,12 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
.jobName("job3").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
- dagManagementStateStore.checkpointDag(dag);
+ dagManagementStateStore.addDag(dag);
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index c84132c8ad..e342b29e77 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.concurrent.ExecutionException;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -37,8 +36,9 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -48,13 +48,13 @@ import static org.mockito.Mockito.spy;
public class ResumeDagProcTest {
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
- this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
+ this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@@ -69,7 +69,7 @@ public class ResumeDagProcTest {
This test creates a failed dag and launches a resume dag proc for it. It
then verifies that the next jobs are set to run.
*/
@Test
- public void resumeDag() throws IOException, URISyntaxException,
ExecutionException, InterruptedException {
+ public void resumeDag() throws IOException, URISyntaxException {
long flowExecutionId = 12345L;
String flowGroup = "fg";
String flowName = "fn";
@@ -78,15 +78,16 @@ public class ResumeDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
-
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
// simulate a failed dag in store
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
- this.dagManagementStateStore.checkpointDag(dag);
+ this.dagManagementStateStore.addDag(dag);
// simulate it as a failed dag
- this.dagManagementStateStore.markDagFailed(dag);
+ this.dagManagementStateStore.markDagFailed(dagId);
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),