This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 492fea22f [GOBBLIN-2002] create MostlyInMemoryDagManagementStateStore
to merge UserQuotaManage… (#3878)
492fea22f is described below
commit 492fea22f68536f4ad9fdf6f14cefbade6b1b189
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Feb 16 15:21:04 2024 -0800
[GOBBLIN-2002] create MostlyInMemoryDagManagementStateStore to merge
UserQuotaManage… (#3878)
* create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager,
DagStateStore and in-memory dag maps used in DagManager
* address review comments
* add javadoc
---
.../apache/gobblin/metastore/MysqlStateStore.java | 7 +-
.../gobblin/service/modules/flowgraph/Dag.java | 7 +-
.../service/modules/flowgraph/DagNodeId.java | 41 +++++
.../orchestration/DagManagementStateStore.java | 168 +++++++++++++++++
.../modules/orchestration/DagManagerUtils.java | 10 +-
.../modules/orchestration/DagStateStore.java | 4 +
.../MostlyMySqlDagManagementStateStore.java | 200 +++++++++++++++++++++
.../modules/orchestration/MysqlDagStateStore.java | 27 +--
.../orchestration/MysqlUserQuotaManager.java | 9 +-
.../service/modules/spec/JobExecutionPlan.java | 3 +-
.../MostlyMySqlDagManagementStateStoreTest.java | 147 +++++++++++++++
11 files changed, 596 insertions(+), 27 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 956a42b83..36ff51a56 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -42,13 +42,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import javax.sql.DataSource;
+import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.io.Text;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
@@ -56,6 +54,8 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
+import javax.sql.DataSource;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
@@ -528,6 +528,7 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
}
}
+ // todo - delete should return the deleted row counts
@Override
public void delete(String storeName, String tableName) throws IOException {
try (Connection connection = dataSource.getConnection();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 388d943aa..50a4dcb5c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.service.modules.flowgraph;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,8 +26,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import lombok.Getter;
import lombok.Setter;
+
import org.apache.gobblin.annotation.Alpha;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
new file mode 100644
index 000000000..3594034f6
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Joiner;
+
+import lombok.Data;
+
+
+/**
+ * This class provides a unique identifier for a {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}.
+ * It can be used for various DagNode/Job related operations, e.g. launch job,
retry job, kill job
+ */
+@Data
+public class DagNodeId {
+ private final String flowGroup;
+ private final String flowName;
+ private final long flowExecutionId;
+ private final String jobGroup;
+ private final String jobName;
+
+ @Override
+ public String toString() {
+ return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
new file mode 100644
index 000000000..4903247d2
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+ /**
+ * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+ * e.g. on adding a failed dag in store to retry later, on submitting a dag
node to spec producer because that changes
+ * dag node's state, on resuming a dag, on receiving a new dag from
orchestrator.
+ * Calling on a previously checkpointed Dag updates it.
+ * Opposite of this is {@link DagManagementStateStore#deleteDag} that
removes the Dag from the store.
+ * @param dag The dag to checkpoint
+ */
+ void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+ /**
+ @return whether `dagId` is currently known due to {@link
DagManagementStateStore#checkpointDag} but not yet
+ {@link DagManagementStateStore#deleteDag}
+ */
+ boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+ /**
+ @return the {@link Dag}, if present
+ */
+ Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException;
+
+ /**
+ * Delete the {@link Dag} from the backing store, typically called upon
completion of execution.
+ * @param dag The dag completed/cancelled execution on {@link
org.apache.gobblin.runtime.api.SpecExecutor}.
+ */
+ default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+ deleteDag(DagManagerUtils.generateDagId(dag));
+ }
+
+ /**
+ * Delete the {@link Dag} from the backing store, typically upon completion
of execution.
+ * @param dagId The ID of the dag to clean up.
+ */
+ void deleteDag(DagManager.DagId dagId) throws IOException;
+
+ /**
+ * This marks the dag as a failed one.
+ * Failed dags are queried using {@link
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
+ * @param dag failing dag
+ */
+ void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+ /**
+ * Return a list of all failed dags' IDs contained in the dag state store.
+ */
+ Set<String> getFailedDagIds() throws IOException;
+
+ /**
+ * Returns the failed dag.
+ * If the dag is not found because it was never marked as failed through
{@link DagManagementStateStore#markDagFailed(Dag)},
+ * it returns Optional.absent.
+ * @param dagId dag id of the failed dag
+ */
+ Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws
IOException;
+
+ /**
+ * Deletes the failed dag. No-op if dag is not found or is not marked as
failed.
+ * @param dag
+ * @throws IOException
+ */
+ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+ deleteFailedDag(DagManagerUtils.generateDagId(dag));
+ }
+
+ void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+ /**
+ * Adds state of a {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+ * Note that a DagNode is a part of a Dag and must already be present in the
store through
+ * {@link DagManagementStateStore#checkpointDag}. This call is just an
additional identifier which may be used
+ * for DagNode level operations. In the future, it may be merged with
checkpointDag.
+ * @param dagNode dag node to be added
+ * @param dagId dag id of the dag this dag node belongs to
+ */
+ void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId
dagId) throws IOException;
+
+ /**
+ * Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}, or Optional.absent
if it
+ * is not found.
+ * @param dagNodeId of the dag node
+ */
+ Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId);
+
+ /**
+ * Returns a list of {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
+ * Returned list will be empty if the dag is not found in the store.
+ * @param dagId DagId of the dag for which all DagNodes are requested
+ */
+ List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId)
throws IOException;
+
+ /**
+ * Returns the {@link Dag} the provided {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
+ * or Optional.absent if it is not found.
+ */
+ Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan>
dagNode);
+
+ /**
+ * Deletes the dag node state that was added through {@link
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
+ * No-op if the dag node is not found in the store.
+ * @param dagNode dag node to be deleted
+ * @param dagId dag id of the dag this dag node belongs to
+ */
+ void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode);
+
+ /**
+ * Loads all currently running {@link Dag}s from the underlying store.
Typically, invoked when a new {@link DagManager}
+ * takes over or on restart of service.
+ * @return a {@link List} of currently running {@link Dag}s.
+ */
+ List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+ /**
+ * Initialize the dag quotas with the provided set of dags.
+ */
+ void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
+
+ /**
+ * Checks if the dagNode exceeds the statically configured user quota for
the proxy user, requester user and flowGroup.
+ * It also increases the quota usage for proxy user, requester and the
flowGroup of the given DagNode by one.
+ * @throws QuotaExceededException if the quota is exceeded
+ */
+ void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode)
throws IOException;
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link Dag.DagNode}.
+ * It is usually used with `deleteDagNodeState`, but can also be used
independently sometimes.
+ * Returns true if successfully reduces the quota usage
+ */
+ boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException;
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 06ac24aa7..e894a0b16 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,7 +16,6 @@
*/
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -51,6 +50,7 @@ import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
@@ -456,17 +456,13 @@ public class DagManagerUtils {
return dagNodes != null && !dagNodes.isEmpty();
}
- public static String calcJobId(Config jobConfig) {
+ public static DagNodeId calcJobId(Config jobConfig) {
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = ConfigUtils.getLong(jobConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
String jobGroup = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_GROUP_KEY, "");
String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
- return calcJobId(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
- }
-
- public static String calcJobId(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName) {
- return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
+ return new DagNodeId(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 56249639e..09824ddcb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -70,4 +70,8 @@ public interface DagStateStore {
* Return a list of all dag IDs contained in the dag state store.
*/
Set<String> getDagIds() throws IOException;
+
+ default boolean existsDag(DagManager.DagId dagId) throws IOException {
+ throw new UnsupportedOperationException("containsDag not implemented");
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
new file mode 100644
index 000000000..d484ff8f8
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii)
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at
one place.
+ */
+@Slf4j
+public class MostlyMySqlDagManagementStateStore implements
DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new ConcurrentHashMap<>();
+ private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new
ConcurrentHashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ private final Map<DagManager.DagId,
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
ConcurrentHashMap<>();
+ private final Map<DagManager.DagId, Long> dagToDeadline = new
ConcurrentHashMap<>();
+ private final DagStateStore dagStateStore;
+ private final DagStateStore failedDagStateStore;
+ private final UserQuotaManager quotaManager;
+ private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
+ public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+ public MostlyMySqlDagManagementStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) throws IOException {
+ this.dagStateStore = createDagStateStore(config, topologySpecMap);
+ this.failedDagStateStore = createDagStateStore(
+ ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+ this.quotaManager = new MysqlUserQuotaManager(config);
+ this.quotaManager.init(getDags());
+ }
+
+ DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
+ try {
+ Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStore.class.getName()));
+ return (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+ this.dagStateStore.writeCheckpoint(dag);
+ }
+
+ @Override
+ public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+ this.dagStateStore.cleanUp(dag);
+ // todo - updated failedDagStateStore iff cleanup returned 1
+ this.failedDagStateStore.writeCheckpoint(dag);
+ }
+
+ @Override
+ public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+ this.dagStateStore.cleanUp(dag);
+ }
+
+ @Override
+ public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+ this.failedDagStateStore.cleanUp(dag);
+ }
+
+ @Override
+ public void deleteDag(DagManager.DagId dagId) throws IOException {
+ this.dagStateStore.cleanUp(dagId.toString());
+ }
+
+ @Override
+ public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+ this.failedDagStateStore.cleanUp(dagId.toString());
+ }
+
+ @Override
+ public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+ return this.dagStateStore.getDags();
+ }
+
+ @Override
+ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId)
throws IOException {
+ return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+ }
+
+ @Override
+ public Set<String> getFailedDagIds() throws IOException {
+ return this.failedDagStateStore.getDagIds();
+ }
+
+ @Override
+ // todo - updating different mapps here and in addDagNodeState can result in
inconsistency between the maps
+ public synchronized void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ this.jobToDag.remove(dagNode);
+ this.dagNodes.remove(dagNode.getValue().getId());
+ this.dagToDeadline.remove(dagId);
+ this.dagToJobs.get(dagId).remove(dagNode);
+ if (this.dagToJobs.get(dagId).isEmpty()) {
+ this.dagToJobs.remove(dagId);
+ }
+ }
+
+ // todo - updating different mapps here and in deleteDagNodeState can result
in inconsistency between the maps
+ @Override
+ public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan>
dagNode, DagManager.DagId dagId)
+ throws IOException {
+ Optional<Dag<JobExecutionPlan>> dag = getDag(dagId);
+ if (!dag.isPresent()) {
+ throw new RuntimeException("Dag " + dagId + " not found");
+ }
+ this.jobToDag.put(dagNode, dag.get());
+ this.dagNodes.put(dagNode.getValue().getId(), dagNode);
+ if (!this.dagToJobs.containsKey(dagId)) {
+ this.dagToJobs.put(dagId, Lists.newLinkedList());
+ }
+ this.dagToJobs.get(dagId).add(dagNode);
+ }
+
+ @Override
+ public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException {
+ return Optional.of(this.dagStateStore.getDag(dagId.toString()));
+ }
+
+ @Override
+ public boolean containsDag(DagManager.DagId dagId) throws IOException {
+ return this.dagStateStore.existsDag(dagId);
+ }
+
+ @Override
+ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId
dagNodeId) {
+ return Optional.of(this.dagNodes.get(dagNodeId));
+ }
+
+
+ @Override
+ public Optional<Dag<JobExecutionPlan>>
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
+ return Optional.of(this.jobToDag.get(dagNode));
+ }
+
+ @Override
+ public List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
dagId) {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+ if (dagNodes != null) {
+ return dagNodes;
+ } else {
+ return Lists.newLinkedList();
+ }
+ }
+
+ public void initQuota(Collection<Dag<JobExecutionPlan>> dags) {
+ // This implementation does not need to update quota usage when the
service restarts or when its leadership status changes
+ // because quota usage are persisted in mysql table
+ }
+
+ @Override
+ public void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>>
dagNodes) throws IOException {
+ this.quotaManager.checkQuota(dagNodes);
+ }
+
+ @Override
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ return this.quotaManager.releaseQuota(dagNode);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index c578c699b..087970446 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -25,6 +25,13 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
@@ -45,13 +52,6 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
import org.apache.gobblin.util.ConfigUtils;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-import com.typesafe.config.Config;
-
import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
@@ -131,9 +131,15 @@ public class MysqlDagStateStore implements DagStateStore {
public void cleanUp(String dagId)
throws IOException {
mysqlStateStore.delete(getStoreNameFromDagId(dagId),
getTableNameFromDagId(dagId));
+ // todo - decrease the count only if delete returned 1
this.totalDagCount.dec();
}
+ @Override
+ public boolean existsDag(DagManager.DagId dagId) throws IOException {
+ return mysqlStateStore.exists(getStoreNameFromDagId(dagId.toString()),
getTableNameFromDagId(dagId.toString()));
+ }
+
@Override
public List<Dag<JobExecutionPlan>> getDags()
throws IOException {
@@ -160,21 +166,21 @@ public class MysqlDagStateStore implements DagStateStore {
* Convert a state store entry into a dag ID
* e.g. storeName = group1_name1, tableName = 1234 gives dagId
group1_name1_1234
*/
- private String entryToDagId(String storeName, String tableName) {
+ private static String entryToDagId(String storeName, String tableName) {
return
Joiner.on(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER).join(storeName,
tableName);
}
/**
* Return a storeName given a dagId. Store name is defined as
flowGroup_flowName.
*/
- private String getStoreNameFromDagId(String dagId) {
+ private static String getStoreNameFromDagId(String dagId) {
return dagId.substring(0,
dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER));
}
/**
* Return a tableName given a dagId. Table name is defined as the
flowExecutionId.
*/
- private String getTableNameFromDagId(String dagId) {
+ private static String getTableNameFromDagId(String dagId) {
return
dagId.substring(dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER)
+ 1);
}
@@ -182,7 +188,6 @@ public class MysqlDagStateStore implements DagStateStore {
* For {@link Dag} to work with {@link MysqlStateStore}, it needs to be
packaged into a {@link State} object.
* The way that it does is simply serialize the {@link Dag} first and use
the key {@link #DAG_KEY_IN_STATE}
* to be pair with it.
- *
* The serialization step is required for readability and portability of
serde lib.
* @param dag The dag to be converted.
* @return An {@link State} object that contains a single k-v pair for
{@link Dag}.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index cdbce8e5e..c4e3b8713 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -17,17 +17,16 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
-
import java.util.List;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.typesafe.config.Config;
@@ -283,6 +282,10 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
return new RunningDagIdsStore(dataSource, quotaStoreTableName);
}
+ public static String qualify(String configKey) {
+ return MysqlUserQuotaManager.CONFIG_PREFIX + "." + configKey;
+ }
+
static class MysqlQuotaStore {
protected final DataSource dataSource;
final String tableName;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 535179ee8..97ca4fc45 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -73,7 +74,7 @@ public class JobExecutionPlan {
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
private long flowStartTime = 0L;
- private final String id;
+ private final DagNodeId id;
public static class Factory {
public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
new file mode 100644
index 000000000..4e1b914d1
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not
Mysql-related components.
+ */
+public class MostlyMySqlDagManagementStateStoreTest {
+
+ private DagManagementStateStore dagManagementStateStore;
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+ private static final String TEST_TABLE = "quotas";
+ static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Setting up mock DB
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Config config;
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+ config = configBuilder.build();
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, topologySpecMap);
+ }
+
+ @Test
+ public void testAddDag() throws Exception {
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+ Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+ Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+ DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+ DagNodeId dagNodeId =
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+ this.dagManagementStateStore.checkpointDag(dag);
+ this.dagManagementStateStore.checkpointDag(dag2);
+ this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+ this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+ this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+
+ Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
+ Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getDag(dagId).get().toString());
+ Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNode(dagNodeId).get());
+ Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
+
+ List<Dag.DagNode<JobExecutionPlan>> dagNodes =
this.dagManagementStateStore.getDagNodes(dagId);
+ Assert.assertEquals(2, dagNodes.size());
+ Assert.assertEquals(dagNode, dagNodes.get(0));
+ Assert.assertEquals(dagNode2, dagNodes.get(1));
+
+ dagNodes = this.dagManagementStateStore.getDagNodes(dagId);
+ Assert.assertEquals(2, dagNodes.size());
+ Assert.assertTrue(dagNodes.contains(dagNode));
+ Assert.assertTrue(dagNodes.contains(dagNode2));
+
+ this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode);
+
Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode));
+
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode2));
+
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId2).contains(dagNode3));
+ }
+
+ /**
+ * Only overwrite {@link #createStateStore(Config)} method to directly
return a mysqlStateStore
+ * backed by mocked db.
+ */
+ public static class TestMysqlDagStateStore extends MysqlDagStateStore {
+ public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
+ super(config, topologySpecMap);
+ }
+
+ @Override
+ protected StateStore<State> createStateStore(Config config) {
+ try {
+
+ String jdbcUrl =
MostlyMySqlDagManagementStateStoreTest.testMetastoreDatabase.getJdbcUrl();
+ HikariDataSource dataSource = new HikariDataSource();
+
+
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+ dataSource.setAutoCommit(false);
+ dataSource.setJdbcUrl(jdbcUrl);
+ dataSource.setUsername(TEST_USER);
+ dataSource.setPassword(TEST_PASSWORD);
+
+ return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false,
State.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file