[ 
https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905391
 ]

ASF GitHub Bot logged work on GOBBLIN-2002:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Feb/24 18:35
            Start Date: 16/Feb/24 18:35
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3878:
URL: https://github.com/apache/gobblin/pull/3878#discussion_r1492805565


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());

Review Comment:
   either now or as a fast-follow, let's add an overload of `calcJobId` that 
takes a `DagNode<JEP>`, similar to the overload of `generateDagId(Dag<JEP>)`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);

Review Comment:
   we did a `checkpointDag` prior to `addDagNodeState` for `dag`, but not for 
`dag2`.
   
   we should perhaps even throw an exception if asked to `addDagNodeState` to 
an unknown/uncheckpointed `Dag`.  what's your take?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+
+    Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).toString());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNode(dagNodeId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).toString());
+
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertEquals(dagNode, dagNodes.get(0));
+    Assert.assertEquals(dagNode2, dagNodes.get(1));
+
+    dagNodes = this.dagManagementStateStore.getDagNodes(dagId);

Review Comment:
   should be same result as line 102, no?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+
+    Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).toString());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNode(dagNodeId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).toString());
+
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertEquals(dagNode, dagNodes.get(0));
+    Assert.assertEquals(dagNode2, dagNodes.get(1));
+
+    dagNodes = this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertTrue(dagNodes.contains(dagNode));
+    Assert.assertTrue(dagNodes.contains(dagNode2));

Review Comment:
   I do prefer this form to the assertions on 104+105... but aren't they 
fundamentally the same?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToDeadline = new HashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException {
+    return this.failedDagStateStore.getDag(dagId.toString());
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    String dagIdStr = dagId.toString();
+    this.jobToDag.remove(dagNode);
+    this.dagNodes.remove(dagNode.getValue().getId());
+    this.dagToDeadline.remove(dagIdStr);
+    this.dagToJobs.get(dagIdStr).remove(dagNode);
+    if (this.dagToJobs.get(dagIdStr).isEmpty()) {
+      this.dagToJobs.remove(dagIdStr);
+    }
+  }
+
+  @Override
+  public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
+      throws IOException {
+    String dagIdStr = dagId.toString();
+    Dag<JobExecutionPlan> dag = getDag(dagId);
+    this.jobToDag.put(dagNode, dag);
+    this.dagNodes.put(dagNode.getValue().getId(), dagNode);
+    if (!this.dagToJobs.containsKey(dagIdStr)) {
+      this.dagToJobs.put(dagIdStr, Lists.newLinkedList());
+    }
+    this.dagToJobs.get(dagIdStr).add(dagNode);
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws 
IOException {
+    return this.dagStateStore.getDag(dagId.toString());
+  }
+
+  @Override
+  public boolean containsDag(DagManager.DagId dagId) throws IOException {
+    return this.dagStateStore.existsDag(dagId);
+  }
+
+  @Override
+  public Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId) {
+    return this.dagNodes.get(dagNodeId);
+  }
+
+
+  @Override
+  public Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> 
dagNode) {
+    return this.jobToDag.get(dagNode);
+  }
+
+  @Override
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> 
getDagNodes(DagManager.DagId dagId) {
+    if (this.dagToJobs.containsKey(dagId.toString())) {
+      return this.dagToJobs.get(dagId.toString());

Review Comment:
   this check-then-get looks like a race condition, because the method's not 
`synchronized`.
   
   but anyway, easiest and safest would just be to call `get` and then check 
whether it returned `null` or not



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.
+   * @param dagId dag id of the failed dag
+   */
+  Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and may already be present in the 
store thorugh
+   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
+   * @param dagNode dag node to be added
+   * @param dagId dag id of the dag this dag node belongs to
+   * @throws IOException
+   */
+  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId)
+      throws IOException;
+
+  /**
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}
+   * @param dagNodeId of the dag ndoe
+   */
+  Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId);
+
+  /**
+   * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}
+   * @param dagId DagId of the dag for which all DagNodes are requested
+   */
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
+
+  /**
+   * Returns the {@link Dag} the provided {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to,
+   * or null if the dag node is not found.

Review Comment:
   recommend Optional



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToDeadline = new HashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.writeCheckpoint(dag);

Review Comment:
   should marking the dag failed also remove it from the `dagStateStore`?  
relatedly, shouldn't we verify that the Dag was previously known (via the DSS)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.

Review Comment:
   technically it looks to be queried by `getFailedDagId(DagId)`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.
+   * @param dagId dag id of the failed dag
+   */
+  Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and may already be present in the 
store thorugh
+   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
+   * @param dagNode dag node to be added
+   * @param dagId dag id of the dag this dag node belongs to
+   * @throws IOException
+   */
+  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId)
+      throws IOException;
+
+  /**
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}
+   * @param dagNodeId of the dag ndoe
+   */
+  Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId);

Review Comment:
   let's clarify contract for when not found



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
+    Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
+    Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
+    DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
+
+    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
+    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+
+    Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).toString());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNode(dagNodeId));
+    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).toString());
+
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertEquals(dagNode, dagNodes.get(0));
+    Assert.assertEquals(dagNode2, dagNodes.get(1));
+
+    dagNodes = this.dagManagementStateStore.getDagNodes(dagId);
+    Assert.assertEquals(2, dagNodes.size());
+    Assert.assertTrue(dagNodes.contains(dagNode));
+    Assert.assertTrue(dagNodes.contains(dagNode2));
+
+    this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode);
+    
Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode));

Review Comment:
   let's do an assertion of what it actually is, not merely what's not in it.  
e.g. it's not supposed to be empty, but should have now a single element, which 
is `dagNode2`, correct?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MostlyInMemoryDagManagementStateStoreTest {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyInMemoryDagManagementStateStore(config, topologySpecMap);
+  }
+
+  @Test
+  public void testAddDag() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);

Review Comment:
   nit: for clarity in the assertions below, number this `dag1` (or 
`dagA`/`dagB`)
   
   then number the dag nodes accordingly by parent dag, then index.  e.g.:
   ```
   dag1Node1, dag1Node2, dag2Node1
   // OR
   dagANode1, dagANode2, dagBNode1
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.

Review Comment:
   "calling on a previously checkpointed Dag updates it"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.

Review Comment:
   (thanks for documenting!  now with behavior specified, it's easier to 
consider which semantics are desirable.)
   
   I'd really pull for `Optional`, rather than `null`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.

Review Comment:
   nit: "...not found *because it was never* marked..."
   
   that said, let's use `Optional`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag

Review Comment:
   take it or leave it, but this is how I write brief javadoc (e.g. by 
presuming `dagId` to be self-explanatory):
   ```
   @return whether `dagId` is currently know from `checkpointDag` but not yet 
`deleteDag`
   ```
   or for the one below:
   ```
   @return the {@link Dag}, if present
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToDeadline = new HashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException {
+    return this.failedDagStateStore.getDag(dagId.toString());
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   instance-level synchronization is probably not what we want, but rather 
`dagId`-level synchronization.  totally reasonable to make this a TODO



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.
+   * @param dagId dag id of the failed dag
+   */
+  Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and may already be present in the 
store thorugh

Review Comment:
   actually I was asking about this in the unit test: why "may already be 
present" vs. "must already be present"?  please weight the pros/cons so we're 
clear we've made the right choice



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyInMemoryDagManagementStateStore implements 
DagManagementStateStore {

Review Comment:
   this PR has come a long way!  and I know I originally suggested the name 
`MostlyInMemoryDagMSS`, but looking at it now, it feels more like 
`MostlyMysqlDagMSS`!  the core "SoT" data are all in mysql.  the in-memory 
parts are mostly just reverse indices / lookup tables for the DB state.  up to 
you if you want to rename



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -74,16 +78,10 @@ public interface DagManagementStateStore {
    */
   List<Dag<JobExecutionPlan>> getDags() throws IOException;
 
-  /**
-   * Return a list of all dag IDs contained in the dag state store.
-   */
-  Set<String> getDagIds() throws IOException;
-  Set<String> getFailedDagIds() throws IOException;
-
   /**
    * Initialize with the provided set of dags.
    */
-  void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws 
IOException;
+  void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;

Review Comment:
   If you prefer, we can punt on this to a later PR... but I do anticipate a 
startup process like:
   1. load all dags
   2. adjust quota based on those dags
   3. addDagNodes based on those dags



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.
+   * @param dagId dag id of the failed dag
+   */
+  Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and may already be present in the 
store thorugh
+   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
+   * @param dagNode dag node to be added
+   * @param dagId dag id of the dag this dag node belongs to
+   * @throws IOException
+   */
+  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId)
+      throws IOException;
+
+  /**
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}
+   * @param dagNodeId of the dag ndoe
+   */
+  Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId);
+
+  /**
+   * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}

Review Comment:
   not documented here, but the impl seems to return an empty list when not 
found.  I don't believe a real Dag could ever have no DagNodes, so, yes, that 
could potentially be a sentinel value for "not found"... but overall, we 
probably want to more explicitly communicate an unknown `DagId` to handle that 
specifically





Issue Time Tracking
-------------------

    Worklog Id:     (was: 905391)
    Time Spent: 5h 10m  (was: 5h)

> create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, 
> DagStateStore and in-memory dag maps used in DagManager
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2002
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2002
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> this will help in refactoring DagManager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Reply via email to