[ 
https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=904813&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904813
 ]

ASF GitHub Bot logged work on GOBBLIN-2002:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Feb/24 07:53
            Start Date: 14/Feb/24 07:53
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3878:
URL: https://github.com/apache/gobblin/pull/3878#discussion_r1488990478


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);

Review Comment:
   what does it mean to "add a clean-up DAG"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
+   */
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dag The dag completed/cancelled from execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
+  void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void cleanUp(String dagId) throws IOException;
+  void cleanUpFailedDag(String dagId) throws IOException;
+
+  /**
+   * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
+   * takes over or on restart of service.
+   * @return a {@link List} of currently running {@link Dag}s.
+   */
+  List<Dag<JobExecutionPlan>> getDags() throws IOException;

Review Comment:
   consider `Iterator`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;

Review Comment:
   might be clearest: `getDagNodesForAllDags` (which is what I believe this 
does...)
   
   BTW, this may be expensive and could ultimately prove a memory scaling 
bottleneck.  what's the scenario in which we need it?  I don't wish to 
prematurely optimize, but would returning an `Iterator` give us the opportunity 
to implement more efficiently?  ... or is the `List` crucial, e.g., to capture 
an atomically-consistent snapshot of the DagNodes from a particular moment in 
time?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
+   */
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dag The dag completed/cancelled from execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
+  void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void cleanUp(String dagId) throws IOException;
+  void cleanUpFailedDag(String dagId) throws IOException;
+
+  /**
+   * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
+   * takes over or on restart of service.
+   * @return a {@link List} of currently running {@link Dag}s.
+   */
+  List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+  /**
+   * Return a list of all dag IDs contained in the dag state store.
+   */
+  Set<String> getDagIds() throws IOException;
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Initialize with the provided set of dags.
+   */
+  void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws 
IOException;

Review Comment:
   `initQuota`?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, 
"FINISH_RUNNING", true);
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, 
"FINISH_RUNNING", true);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    String dagId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+    String dagId2 = 
DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addDagNodeState(dagId, dagNode);
+    this.dagManagementStateStore.addDagNodeState(dagId, dagNode2);

Review Comment:
   suggest renaming, since typical behavior of `add` is to replace any value 
held by the same key, but this seems more like append.
   
   alternatively would it make sense to have the signature
   ```
   addDNState(String dagId, List<Dag.DagNode<JEP>> dagNodes)
   ```
   and that would be called only once?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, 
"FINISH_RUNNING", true);
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, 
"FINISH_RUNNING", true);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    String dagId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+    String dagId2 = 
DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addDagNodeState(dagId, dagNode);
+    this.dagManagementStateStore.addDagNodeState(dagId, dagNode2);
+    this.dagManagementStateStore.addDagNodeState(dagId2, dagNode3);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);

Review Comment:
   nit: wait to init until using (between line 100, 101)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;

Review Comment:
   is the `String dagId` equivalent to:
   ```
   DagManager.DagId myDagId = ...
   myDagId.toString()
   ```
   ?
   
   if so, I suggest using `DagId` for type-safety



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  @Getter private final Map<Dag.DagNode<JobExecutionPlan>, 
Dag<JobExecutionPlan>> jobToDag = new HashMap<>();

Review Comment:
   is the getter only used internally or you're wanting in effect to expand the 
API beyond the DMSS interface?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  @Getter private final Map<Dag.DagNode<JobExecutionPlan>, 
Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+  private final Map<String, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToDeadline = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws 
IOException {
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void cleanUp(String dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId);
+  }
+
+  @Override
+  public void cleanUpFailedDag(String dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId);
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException {
+    return this.failedDagStateStore.getDag(dagId);
+  }
+
+  @Override
+  public Set<String> getDagIds() throws IOException {
+    return this.dagStateStore.getDagIds();
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  public synchronized void deleteDagNodeState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.jobToDag.remove(dagNode);
+    this.dagNodes.remove(dagNode.getValue().getId());
+    this.dagToDeadline.remove(dagId);
+    this.dagToJobs.get(dagId).remove(dagNode);
+    if (this.dagToJobs.get(dagId).isEmpty()) {
+      this.dagToJobs.remove(dagId);
+    }
+  }
+
+  @Override
+  public synchronized void addDagNodeState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+    this.jobToDag.put(dagNode, dag);
+    this.dagNodes.put(dagNode.getValue().getId(), dagNode);
+    if (!this.dagToJobs.containsKey(dagId)) {
+      this.dagToJobs.put(dagId, Lists.newLinkedList());
+    }
+    this.dagToJobs.get(dagId).add(dagNode);
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) {
+    return this.dags.get(dagId);
+  }
+
+  @Override
+  public void addDag(String dagId, Dag<JobExecutionPlan> dag) {
+    this.dags.put(dagId, dag);
+  }
+
+  @Override
+  public boolean containsDag(String dagId) {
+    return this.dags.containsKey(dagId);
+  }
+
+  @Override
+  public Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId) {
+    return this.dagNodes.get(dagNodeId);
+  }
+
+
+  @Override
+  public Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> 
dagNode) {
+    return this.jobToDag.get(dagNode);
+  }
+
+  @Override
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) {
+    if (this.dagToJobs.containsKey(dagId)) {
+      return this.dagToJobs.get(dagId);
+    } else {
+      return Lists.newLinkedList();
+    }
+  }
+
+  public List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() {
+    List<Dag.DagNode<JobExecutionPlan>> allJobs = new ArrayList<>();
+    for (Collection<Dag.DagNode<JobExecutionPlan>> collection : 
this.dagToJobs.values()) {
+      allJobs.addAll(collection);
+    }
+    return allJobs;
+  }
+
+  @Override
+  public boolean addCleanUpDag(String dagId) {
+    return this.dagIdstoClean.add(dagId);
+  }
+
+  public void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) {
+    // This implementation does not need to update quota usage when the 
service restarts or when its leadership status changes

Review Comment:
   please document why not



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
+   */
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;

Review Comment:
   should this usually happen in tandem w/ `cleanUp()` or is that not always 
the case?  if it is, combine into a singular op (even if we're not truly able 
to guarantee all-or-nothing atomic success)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())

Review Comment:
   suggest the method:
   ```
   public static String MysqlUserQuotaManager::qualify(String configKey) {
     return MysqlUserQuotaManager.CONFIG_PREFIX + configKey
   }
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, 
"FINISH_RUNNING", true);
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, 
"FINISH_RUNNING", true);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    String dagId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+    String dagId2 = 
DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addDag(dagId, dag);

Review Comment:
   why twice?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}

Review Comment:
   javadoc feels unclear, esp. since it provides so little guidance on when and 
where to use this in the course of managing a Dag.  if that's challenging to 
state as an abstract rule, include an example.
   
   also specify inter-method guarantees--e.g. if we just called `addDag`, does 
it call `writeCheckpoint` behind the scenes for us?
   
   rather than 'checkpoint', how about 'persist' or 'track'?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {

Review Comment:
   suggestion: API might be easiest to imagine if you start w/ 
create+update(+delete), then follow w/ the retrieval.  this shows first what 
data may be inserted and how manipulated, so we can later judge whether any 
retrieval methods may be missing or should perhaps be reworked to be atomic



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
+   */
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dag The dag completed/cancelled from execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
+  void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException;

Review Comment:
   if these two are merely a convenience method for the two that follow, please 
provide a default impl (to derive the dagId from the `dag`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws 
IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException;
+  boolean addCleanUpDag(String dagId);
+  void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * This is not an actual checkpoint but more like a Write-ahead log, where 
uncommitted job will be persisted
+   * and be picked up again when leader transition happens.
+   * @param dag The dag submitted to {@link DagManager}
+   */
+  void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+  void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dag The dag completed/cancelled from execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
+  void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void cleanUp(String dagId) throws IOException;
+  void cleanUpFailedDag(String dagId) throws IOException;
+
+  /**
+   * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
+   * takes over or on restart of service.
+   * @return a {@link List} of currently running {@link Dag}s.
+   */
+  List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+  /**
+   * Return a list of all dag IDs contained in the dag state store.
+   */
+  Set<String> getDagIds() throws IOException;
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Initialize with the provided set of dags.
+   */
+  void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws 
IOException;
+
+  /**
+   * Checks if the dagNode exceeds the statically configured user quota for 
the proxy user, requester user and flowGroup.
+   * It also increases the quota usage for proxy user, requester and the 
flowGroup of the given DagNode by one.
+   * @throws QuotaExceededException if the quota is exceeded
+   */
+  void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode) throws 
IOException;
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters 
corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if successfully reduces the quota usage
+   */
+  boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException;

Review Comment:
   specify whether any relationship to `deleteDagNodeState`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+  Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);

Review Comment:
   as the `dagId` can be derived from `dag`, either specify validation and 
behavior for a mismatch or just change this method signature to take simply 
`Dag<JEP>` (and derive dagID internally).





Issue Time Tracking
-------------------

    Worklog Id:     (was: 904813)
    Time Spent: 20m  (was: 10m)

> create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, 
> DagStateStore and in-memory dag maps used in DagManager
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2002
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2002
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> this will help in refactoring DagManager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to