[ https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905391 ]
ASF GitHub Bot logged work on GOBBLIN-2002: ------------------------------------------- Author: ASF GitHub Bot Created on: 16/Feb/24 18:35 Start Date: 16/Feb/24 18:35 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3878: URL: https://github.com/apache/gobblin/pull/3878#discussion_r1492805565 ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); + Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); + DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); Review Comment: either now or as a fast-follow, let's add an overload of `calcJobId` that takes a `DagNode<JEP>`, similar to the overload of `generateDagId(Dag<JEP>)` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); + Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); + DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.checkpointDag(dag); + this.dagManagementStateStore.addDagNodeState(dagNode, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); Review Comment: we did a `checkpointDag` prior to `addDagNodeState` for `dag`, but not for `dag2`. we should perhaps even throw an exception if asked to `addDagNodeState` to an unknown/uncheckpointed `Dag`. what's your take? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); + Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); + DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.checkpointDag(dag); + this.dagManagementStateStore.addDagNodeState(dagNode, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); + + Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getDag(dagId).toString()); + Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNode(dagNodeId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getParentDag(dagNode).toString()); + + List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagManagementStateStore.getDagNodes(dagId); + Assert.assertEquals(2, dagNodes.size()); + Assert.assertEquals(dagNode, dagNodes.get(0)); + Assert.assertEquals(dagNode2, dagNodes.get(1)); + + dagNodes = this.dagManagementStateStore.getDagNodes(dagId); Review Comment: should be same result as line 102, no? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); + Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); + DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.checkpointDag(dag); + this.dagManagementStateStore.addDagNodeState(dagNode, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); + + Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getDag(dagId).toString()); + Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNode(dagNodeId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getParentDag(dagNode).toString()); + + List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagManagementStateStore.getDagNodes(dagId); + Assert.assertEquals(2, dagNodes.size()); + Assert.assertEquals(dagNode, dagNodes.get(0)); + Assert.assertEquals(dagNode2, dagNodes.get(1)); + + dagNodes = this.dagManagementStateStore.getDagNodes(dagId); + Assert.assertEquals(2, dagNodes.size()); + Assert.assertTrue(dagNodes.contains(dagNode)); + Assert.assertTrue(dagNodes.contains(dagNode2)); Review Comment: I do prefer this form to the assertions on 104+105... but aren't they fundamentally the same? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); + // dagToJobs holds a map of dagId to running jobs of that dag + final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + final Map<String, Long> dagToDeadline = new HashMap<>(); + private final DagStateStore dagStateStore; + private final DagStateStore failedDagStateStore; + private final UserQuotaManager quotaManager; + private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; + public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException { + this.dagStateStore = createDagStateStore(config, topologySpecMap); + this.failedDagStateStore = createDagStateStore( + ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap); + this.quotaManager = new MysqlUserQuotaManager(config); + this.quotaManager.init(getDags()); + } + + DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) { + try { + Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, MysqlDagStateStore.class.getName())); + return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.writeCheckpoint(dag); + } + + @Override + public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.writeCheckpoint(dag); + } + + @Override + public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.cleanUp(dag); + } + + @Override + public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.cleanUp(dag); + } + + @Override + public void deleteDag(DagManager.DagId dagId) throws IOException { + this.dagStateStore.cleanUp(dagId.toString()); + } + + @Override + public void deleteFailedDag(DagManager.DagId dagId) throws IOException { + this.failedDagStateStore.cleanUp(dagId.toString()); + } + + @Override + public List<Dag<JobExecutionPlan>> getDags() throws IOException { + return this.dagStateStore.getDags(); + } + + @Override + public Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException { + return this.failedDagStateStore.getDag(dagId.toString()); + } + + @Override + public Set<String> getFailedDagIds() throws IOException { + return this.failedDagStateStore.getDagIds(); + } + + @Override + public synchronized void deleteDagNodeState(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + String dagIdStr = dagId.toString(); + this.jobToDag.remove(dagNode); + this.dagNodes.remove(dagNode.getValue().getId()); + this.dagToDeadline.remove(dagIdStr); + this.dagToJobs.get(dagIdStr).remove(dagNode); + if (this.dagToJobs.get(dagIdStr).isEmpty()) { + this.dagToJobs.remove(dagIdStr); + } + } + + @Override + public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) + throws IOException { + String dagIdStr = dagId.toString(); + Dag<JobExecutionPlan> dag = getDag(dagId); + this.jobToDag.put(dagNode, dag); + this.dagNodes.put(dagNode.getValue().getId(), dagNode); + if (!this.dagToJobs.containsKey(dagIdStr)) { + this.dagToJobs.put(dagIdStr, Lists.newLinkedList()); + } + this.dagToJobs.get(dagIdStr).add(dagNode); + } + + @Override + public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException { + return this.dagStateStore.getDag(dagId.toString()); + } + + @Override + public boolean containsDag(DagManager.DagId dagId) throws IOException { + return this.dagStateStore.existsDag(dagId); + } + + @Override + public Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId) { + return this.dagNodes.get(dagNodeId); + } + + + @Override + public Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) { + return this.jobToDag.get(dagNode); + } + + @Override + public LinkedList<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) { + if (this.dagToJobs.containsKey(dagId.toString())) { + return this.dagToJobs.get(dagId.toString()); Review Comment: this check-then-get looks like a race condition, because the method's not `synchronized`. but anyway, easiest and safest would just be to call `get` and then check whether it returned `null` or not ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. + * @param dagId dag id of the failed dag + */ + Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Deletes the failed dag. No-op if dag is not found or is not marked as failed. + * @param dag + * @throws IOException + */ + default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteFailedDag(DagManagerUtils.generateDagId(dag)); + } + + void deleteFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. + * Note that a DagNode is a part of a Dag and may already be present in the store thorugh + * {@link DagManagementStateStore#checkpointDag}. This call is just an additional identifier which may be used + * for DagNode level operations. In the future, it may be merged with checkpointDag. + * @param dagNode dag node to be added + * @param dagId dag id of the dag this dag node belongs to + * @throws IOException + */ + void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) + throws IOException; + + /** + * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} + * @param dagNodeId of the dag ndoe + */ + Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId); + + /** + * Returns a list of {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag} + * @param dagId DagId of the dag for which all DagNodes are requested + */ + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) throws IOException; + + /** + * Returns the {@link Dag} the provided {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to, + * or null if the dag node is not found. Review Comment: recommend Optional ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); + // dagToJobs holds a map of dagId to running jobs of that dag + final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + final Map<String, Long> dagToDeadline = new HashMap<>(); + private final DagStateStore dagStateStore; + private final DagStateStore failedDagStateStore; + private final UserQuotaManager quotaManager; + private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; + public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException { + this.dagStateStore = createDagStateStore(config, topologySpecMap); + this.failedDagStateStore = createDagStateStore( + ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap); + this.quotaManager = new MysqlUserQuotaManager(config); + this.quotaManager.init(getDags()); + } + + DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) { + try { + Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, MysqlDagStateStore.class.getName())); + return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.writeCheckpoint(dag); + } + + @Override + public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.writeCheckpoint(dag); Review Comment: should marking the dag failed also remove it from the `dagStateStore`? relatedly, shouldn't we verify that the Dag was previously known (via the DSS)? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. Review Comment: technically it looks to be queried by `getFailedDagId(DagId)` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. + * @param dagId dag id of the failed dag + */ + Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Deletes the failed dag. No-op if dag is not found or is not marked as failed. + * @param dag + * @throws IOException + */ + default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteFailedDag(DagManagerUtils.generateDagId(dag)); + } + + void deleteFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. + * Note that a DagNode is a part of a Dag and may already be present in the store thorugh + * {@link DagManagementStateStore#checkpointDag}. This call is just an additional identifier which may be used + * for DagNode level operations. In the future, it may be merged with checkpointDag. + * @param dagNode dag node to be added + * @param dagId dag id of the dag this dag node belongs to + * @throws IOException + */ + void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) + throws IOException; + + /** + * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} + * @param dagNodeId of the dag ndoe + */ + Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId); Review Comment: let's clarify contract for when not found ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); + Dag<JobExecutionPlan> dag2 = DagTestUtils.buildDag("test2", 123456L); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); + DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.checkpointDag(dag); + this.dagManagementStateStore.addDagNodeState(dagNode, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); + this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); + + Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getDag(dagId).toString()); + Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNode(dagNodeId)); + Assert.assertEquals(dag.toString(), this.dagManagementStateStore.getParentDag(dagNode).toString()); + + List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagManagementStateStore.getDagNodes(dagId); + Assert.assertEquals(2, dagNodes.size()); + Assert.assertEquals(dagNode, dagNodes.get(0)); + Assert.assertEquals(dagNode2, dagNodes.get(1)); + + dagNodes = this.dagManagementStateStore.getDagNodes(dagId); + Assert.assertEquals(2, dagNodes.size()); + Assert.assertTrue(dagNodes.contains(dagNode)); + Assert.assertTrue(dagNodes.contains(dagNode2)); + + this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode); + Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode)); Review Comment: let's do an assertion of what it actually is, not merely what's not in it. e.g. it's not supposed to be empty, but should have now a single element, which is `dagNode2`, correct? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L); Review Comment: nit: for clarity in the assertions below, number this `dag1` (or `dagA`/`dagB`) then number the dag nodes accordingly by parent dag, then index. e.g.: ``` dag1Node1, dag1Node2, dag2Node1 // OR dagANode1, dagANode2, dagBNode1 ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. Review Comment: "calling on a previously checkpointed Dag updates it" ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. Review Comment: (thanks for documenting! now with behavior specified, it's easier to consider which semantics are desirable.) I'd really pull for `Optional`, rather than `null` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. Review Comment: nit: "...not found *because it was never* marked..." that said, let's use `Optional` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag Review Comment: take it or leave it, but this is how I write brief javadoc (e.g. by presuming `dagId` to be self-explanatory): ``` @return whether `dagId` is currently know from `checkpointDag` but not yet `deleteDag` ``` or for the one below: ``` @return the {@link Dag}, if present ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); + // dagToJobs holds a map of dagId to running jobs of that dag + final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + final Map<String, Long> dagToDeadline = new HashMap<>(); + private final DagStateStore dagStateStore; + private final DagStateStore failedDagStateStore; + private final UserQuotaManager quotaManager; + private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; + public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException { + this.dagStateStore = createDagStateStore(config, topologySpecMap); + this.failedDagStateStore = createDagStateStore( + ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap); + this.quotaManager = new MysqlUserQuotaManager(config); + this.quotaManager.init(getDags()); + } + + DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) { + try { + Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, MysqlDagStateStore.class.getName())); + return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.writeCheckpoint(dag); + } + + @Override + public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.writeCheckpoint(dag); + } + + @Override + public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.cleanUp(dag); + } + + @Override + public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.cleanUp(dag); + } + + @Override + public void deleteDag(DagManager.DagId dagId) throws IOException { + this.dagStateStore.cleanUp(dagId.toString()); + } + + @Override + public void deleteFailedDag(DagManager.DagId dagId) throws IOException { + this.failedDagStateStore.cleanUp(dagId.toString()); + } + + @Override + public List<Dag<JobExecutionPlan>> getDags() throws IOException { + return this.dagStateStore.getDags(); + } + + @Override + public Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException { + return this.failedDagStateStore.getDag(dagId.toString()); + } + + @Override + public Set<String> getFailedDagIds() throws IOException { + return this.failedDagStateStore.getDagIds(); + } + + @Override + public synchronized void deleteDagNodeState(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) { Review Comment: instance-level synchronization is probably not what we want, but rather `dagId`-level synchronization. totally reasonable to make this a TODO ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. + * @param dagId dag id of the failed dag + */ + Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Deletes the failed dag. No-op if dag is not found or is not marked as failed. + * @param dag + * @throws IOException + */ + default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteFailedDag(DagManagerUtils.generateDagId(dag)); + } + + void deleteFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. + * Note that a DagNode is a part of a Dag and may already be present in the store thorugh Review Comment: actually I was asking about this in the unit test: why "may already be present" vs. "must already be present"? please weight the pros/cons so we're clear we've made the right choice ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { Review Comment: this PR has come a long way! and I know I originally suggested the name `MostlyInMemoryDagMSS`, but looking at it now, it feels more like `MostlyMysqlDagMSS`! the core "SoT" data are all in mysql. the in-memory parts are mostly just reverse indices / lookup tables for the DB state. up to you if you want to rename ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -74,16 +78,10 @@ public interface DagManagementStateStore { */ List<Dag<JobExecutionPlan>> getDags() throws IOException; - /** - * Return a list of all dag IDs contained in the dag state store. - */ - Set<String> getDagIds() throws IOException; - Set<String> getFailedDagIds() throws IOException; - /** * Initialize with the provided set of dags. */ - void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws IOException; + void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException; Review Comment: If you prefer, we can punt on this to a later PR... but I do anticipate a startup process like: 1. load all dags 2. adjust quota based on those dags 3. addDagNodes based on those dags ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. + * @param dagId dag id of the failed dag + */ + Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Deletes the failed dag. No-op if dag is not found or is not marked as failed. + * @param dag + * @throws IOException + */ + default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteFailedDag(DagManagerUtils.generateDagId(dag)); + } + + void deleteFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. + * Note that a DagNode is a part of a Dag and may already be present in the store thorugh + * {@link DagManagementStateStore#checkpointDag}. This call is just an additional identifier which may be used + * for DagNode level operations. In the future, it may be merged with checkpointDag. + * @param dagNode dag node to be added + * @param dagId dag id of the dag this dag node belongs to + * @throws IOException + */ + void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) + throws IOException; + + /** + * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} + * @param dagNodeId of the dag ndoe + */ + Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId); + + /** + * Returns a list of {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag} Review Comment: not documented here, but the impl seems to return an empty list when not found. I don't believe a real Dag could ever have no DagNodes, so, yes, that could potentially be a sentinel value for "not found"... but overall, we probably want to more explicitly communicate an unknown `DagId` to handle that specifically Issue Time Tracking ------------------- Worklog Id: (was: 905391) Time Spent: 5h 10m (was: 5h) > create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, > DagStateStore and in-memory dag maps used in DagManager > ------------------------------------------------------------------------------------------------------------------------------- > > Key: GOBBLIN-2002 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2002 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > this will help in refactoring DagManager -- This message was sent by Atlassian Jira (v8.20.10#820010)