This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 492fea22f [GOBBLIN-2002] create MostlyInMemoryDagManagementStateStore 
to merge UserQuotaManage… (#3878)
492fea22f is described below

commit 492fea22f68536f4ad9fdf6f14cefbade6b1b189
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Feb 16 15:21:04 2024 -0800

    [GOBBLIN-2002] create MostlyInMemoryDagManagementStateStore to merge 
UserQuotaManage… (#3878)
    
    * create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, 
DagStateStore and in-memory dag maps used in DagManager
    * address review comments
    * add javadoc
---
 .../apache/gobblin/metastore/MysqlStateStore.java  |   7 +-
 .../gobblin/service/modules/flowgraph/Dag.java     |   7 +-
 .../service/modules/flowgraph/DagNodeId.java       |  41 +++++
 .../orchestration/DagManagementStateStore.java     | 168 +++++++++++++++++
 .../modules/orchestration/DagManagerUtils.java     |  10 +-
 .../modules/orchestration/DagStateStore.java       |   4 +
 .../MostlyMySqlDagManagementStateStore.java        | 200 +++++++++++++++++++++
 .../modules/orchestration/MysqlDagStateStore.java  |  27 +--
 .../orchestration/MysqlUserQuotaManager.java       |   9 +-
 .../service/modules/spec/JobExecutionPlan.java     |   3 +-
 .../MostlyMySqlDagManagementStateStoreTest.java    | 147 +++++++++++++++
 11 files changed, 596 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 956a42b83..36ff51a56 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -42,13 +42,11 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
-import javax.sql.DataSource;
 
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.io.Text;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
@@ -56,6 +54,8 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.zaxxer.hikari.HikariDataSource;
 
+import javax.sql.DataSource;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
@@ -528,6 +528,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
     }
   }
 
+  // todo - delete should return the deleted row counts
   @Override
   public void delete(String storeName, String tableName) throws IOException {
     try (Connection connection = dataSource.getConnection();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 388d943aa..50a4dcb5c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,8 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import lombok.Getter;
 import lombok.Setter;
+
 import org.apache.gobblin.annotation.Alpha;
 
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
new file mode 100644
index 000000000..3594034f6
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DagNodeId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Joiner;
+
+import lombok.Data;
+
+
+/**
+ * This class provides a unique identifier for a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}.
+ * It can be used for various DagNode/Job related operations, e.g. launch job, 
retry job, kill job
+ */
+@Data
+public class DagNodeId {
+  private final String flowGroup;
+  private final String flowName;
+  private final long flowExecutionId;
+  private final String jobGroup;
+  private final String jobName;
+
+  @Override
+  public String toString() {
+    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
new file mode 100644
index 000000000..4903247d2
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Calling on a previously checkpointed Dag updates it.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   @return whether `dagId` is currently known due to {@link 
DagManagementStateStore#checkpointDag} but not yet
+   {@link DagManagementStateStore#deleteDag}
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   @return the {@link Dag}, if present
+   */
+  Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
+   * @param dag failing dag
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found because it was never marked as failed through 
{@link DagManagementStateStore#markDagFailed(Dag)},
+   * it returns Optional.absent.
+   * @param dagId dag id of the failed dag
+   */
+  Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and must already be present in the 
store through
+   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
+   * @param dagNode dag node to be added
+   * @param dagId dag id of the dag this dag node belongs to
+   */
+  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId) throws IOException;
+
+  /**
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}, or Optional.absent 
if it
+   * is not found.
+   * @param dagNodeId of the dag node
+   */
+  Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId);
+
+  /**
+   * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
+   * Returned list will be empty if the dag is not found in the store.
+   * @param dagId DagId of the dag for which all DagNodes are requested
+   */
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
+
+  /**
+   * Returns the {@link Dag} the provided {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
+   * or Optional.absent if it is not found.
+   */
+  Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  /**
+   * Deletes the dag node state that was added through {@link 
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
+   * No-op if the dag node is not found in the store.
+   * @param dagNode dag node to be deleted
+   * @param dagId dag id of the dag this dag node belongs to
+   */
+  void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Loads all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
+   * takes over or on restart of service.
+   * @return a {@link List} of currently running {@link Dag}s.
+   */
+  List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+  /**
+   * Initialize the dag quotas with the provided set of dags.
+   */
+  void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
+
+  /**
+   * Checks if the dagNode exceeds the statically configured user quota for 
the proxy user, requester user and flowGroup.
+   * It also increases the quota usage for proxy user, requester and the 
flowGroup of the given DagNode by one.
+   * @throws QuotaExceededException if the quota is exceeded
+   */
+  void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode) 
throws IOException;
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters 
corresponding to the provided {@link Dag.DagNode}.
+   * It is usually used with `deleteDagNodeState`, but can also be used 
independently sometimes.
+   * Returns true if successfully reduces the quota usage
+   */
+  boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException;
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 06ac24aa7..e894a0b16 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.base.Joiner;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -51,6 +50,7 @@ import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
@@ -456,17 +456,13 @@ public class DagManagerUtils {
     return dagNodes != null && !dagNodes.isEmpty();
   }
 
-  public static String calcJobId(Config jobConfig) {
+  public static DagNodeId calcJobId(Config jobConfig) {
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
     long flowExecutionId = ConfigUtils.getLong(jobConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
     String jobGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_GROUP_KEY, "");
     String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
 
-    return calcJobId(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
-  }
-
-  public static String calcJobId(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
-    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
+    return new DagNodeId(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 56249639e..09824ddcb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -70,4 +70,8 @@ public interface DagStateStore {
    * Return a list of all dag IDs contained in the dag state store.
    */
   Set<String> getDagIds() throws IOException;
+
+  default boolean existsDag(DagManager.DagId dagId) throws IOException {
+    throw new UnsupportedOperationException("containsDag not implemented");
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
new file mode 100644
index 000000000..d484ff8f8
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
+  private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+    // todo - updated failedDagStateStore iff cleanup returned 1
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
+    return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  // todo - updating different mapps here and in addDagNodeState can result in 
inconsistency between the maps
+  public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.jobToDag.remove(dagNode);
+    this.dagNodes.remove(dagNode.getValue().getId());
+    this.dagToDeadline.remove(dagId);
+    this.dagToJobs.get(dagId).remove(dagNode);
+    if (this.dagToJobs.get(dagId).isEmpty()) {
+      this.dagToJobs.remove(dagId);
+    }
+  }
+
+  // todo - updating different mapps here and in deleteDagNodeState can result 
in inconsistency between the maps
+  @Override
+  public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
+      throws IOException {
+    Optional<Dag<JobExecutionPlan>> dag = getDag(dagId);
+    if (!dag.isPresent()) {
+      throw new RuntimeException("Dag " + dagId + " not found");
+    }
+    this.jobToDag.put(dagNode, dag.get());
+    this.dagNodes.put(dagNode.getValue().getId(), dagNode);
+    if (!this.dagToJobs.containsKey(dagId)) {
+      this.dagToJobs.put(dagId, Lists.newLinkedList());
+    }
+    this.dagToJobs.get(dagId).add(dagNode);
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException {
+    return Optional.of(this.dagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public boolean containsDag(DagManager.DagId dagId) throws IOException {
+    return this.dagStateStore.existsDag(dagId);
+  }
+
+  @Override
+  public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId 
dagNodeId) {
+    return Optional.of(this.dagNodes.get(dagNodeId));
+  }
+
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> 
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return Optional.of(this.jobToDag.get(dagNode));
+  }
+
+  @Override
+  public List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+    if (dagNodes != null) {
+      return dagNodes;
+    } else {
+      return Lists.newLinkedList();
+    }
+  }
+
+  public void initQuota(Collection<Dag<JobExecutionPlan>> dags) {
+    // This implementation does not need to update quota usage when the 
service restarts or when its leadership status changes
+    // because quota usage are persisted in mysql table
+  }
+
+  @Override
+  public void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>> 
dagNodes) throws IOException {
+    this.quotaManager.checkQuota(dagNodes);
+  }
+
+  @Override
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
+    return this.quotaManager.releaseQuota(dagNode);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index c578c699b..087970446 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -25,6 +25,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
@@ -45,13 +52,6 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
 import org.apache.gobblin.util.ConfigUtils;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-import com.typesafe.config.Config;
-
 import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
 
@@ -131,9 +131,15 @@ public class MysqlDagStateStore implements DagStateStore {
   public void cleanUp(String dagId)
       throws IOException {
     mysqlStateStore.delete(getStoreNameFromDagId(dagId), 
getTableNameFromDagId(dagId));
+    // todo - decrease the count only if delete returned 1
     this.totalDagCount.dec();
   }
 
+  @Override
+  public boolean existsDag(DagManager.DagId dagId) throws IOException {
+    return mysqlStateStore.exists(getStoreNameFromDagId(dagId.toString()), 
getTableNameFromDagId(dagId.toString()));
+  }
+
   @Override
   public List<Dag<JobExecutionPlan>> getDags()
       throws IOException {
@@ -160,21 +166,21 @@ public class MysqlDagStateStore implements DagStateStore {
    * Convert a state store entry into a dag ID
    * e.g. storeName = group1_name1, tableName = 1234 gives dagId 
group1_name1_1234
    */
-  private String entryToDagId(String storeName, String tableName) {
+  private static String entryToDagId(String storeName, String tableName) {
     return 
Joiner.on(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER).join(storeName, 
tableName);
   }
 
   /**
    * Return a storeName given a dagId. Store name is defined as 
flowGroup_flowName.
    */
-  private String getStoreNameFromDagId(String dagId) {
+  private static String getStoreNameFromDagId(String dagId) {
     return dagId.substring(0, 
dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER));
   }
 
   /**
    * Return a tableName given a dagId. Table name is defined as the 
flowExecutionId.
    */
-  private String getTableNameFromDagId(String dagId) {
+  private static String getTableNameFromDagId(String dagId) {
     return 
dagId.substring(dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER)
 + 1);
   }
 
@@ -182,7 +188,6 @@ public class MysqlDagStateStore implements DagStateStore {
    * For {@link Dag} to work with {@link MysqlStateStore}, it needs to be 
packaged into a {@link State} object.
    * The way that it does is simply serialize the {@link Dag} first and use 
the key {@link #DAG_KEY_IN_STATE}
    * to be pair with it.
-   *
    * The serialization step is required for readability and portability of 
serde lib.
    * @param dag The dag to be converted.
    * @return An {@link State} object that contains a single k-v pair for 
{@link Dag}.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index cdbce8e5e..c4e3b8713 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -17,17 +17,16 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
-
 import java.util.List;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
@@ -283,6 +282,10 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
     return new RunningDagIdsStore(dataSource, quotaStoreTableName);
   }
 
+  public static String qualify(String configKey) {
+    return MysqlUserQuotaManager.CONFIG_PREFIX + "." + configKey;
+  }
+
   static class MysqlQuotaStore {
     protected final DataSource dataSource;
     final String tableName;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 535179ee8..97ca4fc45 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -73,7 +74,7 @@ public class JobExecutionPlan {
   private int currentAttempts = 0;
   private Optional<Future> jobFuture = Optional.absent();
   private long flowStartTime = 0L;
-  private final String id;
+  private final DagNodeId id;
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
new file mode 100644
index 000000000..4e1b914d1
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyMySqlDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.checkpointDag(dag2);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+
+    Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).get().toString());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNode(dagNodeId).get());
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
+
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertEquals(dagNode, dagNodes.get(0));
+    Assert.assertEquals(dagNode2, dagNodes.get(1));
+
+    dagNodes = this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertTrue(dagNodes.contains(dagNode));
+    Assert.assertTrue(dagNodes.contains(dagNode2));
+
+    this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode);
+    
Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode));
+    
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode2));
+    
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId2).contains(dagNode3));
+  }
+
+  /**
+   * Only overwrite {@link #createStateStore(Config)} method to directly 
return a mysqlStateStore
+   * backed by mocked db.
+   */
+  public static class TestMysqlDagStateStore extends MysqlDagStateStore {
+    public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+      super(config, topologySpecMap);
+    }
+
+    @Override
+    protected StateStore<State> createStateStore(Config config) {
+      try {
+
+        String jdbcUrl = 
MostlyMySqlDagManagementStateStoreTest.testMetastoreDatabase.getJdbcUrl();
+        HikariDataSource dataSource = new HikariDataSource();
+
+        
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+        dataSource.setAutoCommit(false);
+        dataSource.setJdbcUrl(jdbcUrl);
+        dataSource.setUsername(TEST_USER);
+        dataSource.setPassword(TEST_PASSWORD);
+
+        return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false, 
State.class);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file


Reply via email to