[ https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=904813&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904813 ]
ASF GitHub Bot logged work on GOBBLIN-2002: ------------------------------------------- Author: ASF GitHub Bot Created on: 14/Feb/24 07:53 Start Date: 14/Feb/24 07:53 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3878: URL: https://github.com/apache/gobblin/pull/3878#discussion_r1488990478 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); Review Comment: what does it mean to "add a clean-up DAG"? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} + */ + void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; + void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void cleanUp(String dagId) throws IOException; + void cleanUpFailedDag(String dagId) throws IOException; + + /** + * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager} + * takes over or on restart of service. + * @return a {@link List} of currently running {@link Dag}s. + */ + List<Dag<JobExecutionPlan>> getDags() throws IOException; Review Comment: consider `Iterator` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; Review Comment: might be clearest: `getDagNodesForAllDags` (which is what I believe this does...) BTW, this may be expensive and could ultimately prove a memory scaling bottleneck. what's the scenario in which we need it? I don't wish to prematurely optimize, but would returning an `Iterator` give us the opportunity to implement more efficiently? ... or is the `List` crucial, e.g., to capture an atomically-consistent snapshot of the DagNodes from a particular moment in time? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} + */ + void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; + void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void cleanUp(String dagId) throws IOException; + void cleanUpFailedDag(String dagId) throws IOException; + + /** + * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager} + * takes over or on restart of service. + * @return a {@link List} of currently running {@link Dag}s. + */ + List<Dag<JobExecutionPlan>> getDags() throws IOException; + + /** + * Return a list of all dag IDs contained in the dag state store. + */ + Set<String> getDagIds() throws IOException; + Set<String> getFailedDagIds() throws IOException; + + /** + * Initialize with the provided set of dags. + */ + void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws IOException; Review Comment: `initQuota`? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, "FINISH_RUNNING", true); + Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, "FINISH_RUNNING", true); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + String dagId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + String dagId2 = DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.addDag(dagId, dag); + this.dagManagementStateStore.addDag(dagId, dag); + this.dagManagementStateStore.addDagNodeState(dagId, dagNode); + this.dagManagementStateStore.addDagNodeState(dagId, dagNode2); Review Comment: suggest renaming, since typical behavior of `add` is to replace any value held by the same key, but this seems more like append. alternatively would it make sense to have the signature ``` addDNState(String dagId, List<Dag.DagNode<JEP>> dagNodes) ``` and that would be called only once? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, "FINISH_RUNNING", true); + Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, "FINISH_RUNNING", true); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + String dagId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + String dagId2 = DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.addDag(dagId, dag); + this.dagManagementStateStore.addDag(dagId, dag); + this.dagManagementStateStore.addDagNodeState(dagId, dagNode); + this.dagManagementStateStore.addDagNodeState(dagId, dagNode2); + this.dagManagementStateStore.addDagNodeState(dagId2, dagNode3); + List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagManagementStateStore.getDagNodes(dagId); Review Comment: nit: wait to init until using (between line 100, 101) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; Review Comment: is the `String dagId` equivalent to: ``` DagManager.DagId myDagId = ... myDagId.toString() ``` ? if so, I suggest using `DagId` for type-safety ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + @Getter private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); Review Comment: is the getter only used internally or you're wanting in effect to expand the API beyond the DMSS interface? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + @Getter private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>(); + private final Map<String, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); + // dagToJobs holds a map of dagId to running jobs of that dag + final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + final Map<String, Long> dagToDeadline = new HashMap<>(); + private final Set<String> dagIdstoClean = new HashSet<>(); + + private final DagStateStore dagStateStore; + private final DagStateStore failedDagStateStore; + private final UserQuotaManager quotaManager; + private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; + public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException { + this.dagStateStore = createDagStateStore(config, topologySpecMap); + this.failedDagStateStore = createDagStateStore( + ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap); + this.quotaManager = new MysqlUserQuotaManager(config); + this.quotaManager.init(getDags()); + } + + DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) { + try { + Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, MysqlDagStateStore.class.getName())); + return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.writeCheckpoint(dag); + } + + @Override + public void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.writeCheckpoint(dag); + } + + @Override + public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.cleanUp(dag); + } + + @Override + public void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.cleanUp(dag); + } + + @Override + public void cleanUp(String dagId) throws IOException { + this.dagStateStore.cleanUp(dagId); + } + + @Override + public void cleanUpFailedDag(String dagId) throws IOException { + this.failedDagStateStore.cleanUp(dagId); + } + + @Override + public List<Dag<JobExecutionPlan>> getDags() throws IOException { + return this.dagStateStore.getDags(); + } + + @Override + public Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException { + return this.failedDagStateStore.getDag(dagId); + } + + @Override + public Set<String> getDagIds() throws IOException { + return this.dagStateStore.getDagIds(); + } + + @Override + public Set<String> getFailedDagIds() throws IOException { + return this.failedDagStateStore.getDagIds(); + } + + @Override + public synchronized void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + this.jobToDag.remove(dagNode); + this.dagNodes.remove(dagNode.getValue().getId()); + this.dagToDeadline.remove(dagId); + this.dagToJobs.get(dagId).remove(dagNode); + if (this.dagToJobs.get(dagId).isEmpty()) { + this.dagToJobs.remove(dagId); + } + } + + @Override + public synchronized void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + Dag<JobExecutionPlan> dag = this.dags.get(dagId); + this.jobToDag.put(dagNode, dag); + this.dagNodes.put(dagNode.getValue().getId(), dagNode); + if (!this.dagToJobs.containsKey(dagId)) { + this.dagToJobs.put(dagId, Lists.newLinkedList()); + } + this.dagToJobs.get(dagId).add(dagNode); + } + + @Override + public Dag<JobExecutionPlan> getDag(String dagId) { + return this.dags.get(dagId); + } + + @Override + public void addDag(String dagId, Dag<JobExecutionPlan> dag) { + this.dags.put(dagId, dag); + } + + @Override + public boolean containsDag(String dagId) { + return this.dags.containsKey(dagId); + } + + @Override + public Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId) { + return this.dagNodes.get(dagNodeId); + } + + + @Override + public Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) { + return this.jobToDag.get(dagNode); + } + + @Override + public LinkedList<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) { + if (this.dagToJobs.containsKey(dagId)) { + return this.dagToJobs.get(dagId); + } else { + return Lists.newLinkedList(); + } + } + + public List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() { + List<Dag.DagNode<JobExecutionPlan>> allJobs = new ArrayList<>(); + for (Collection<Dag.DagNode<JobExecutionPlan>> collection : this.dagToJobs.values()) { + allJobs.addAll(collection); + } + return allJobs; + } + + @Override + public boolean addCleanUpDag(String dagId) { + return this.dagIdstoClean.add(dagId); + } + + public void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) { + // This implementation does not need to update quota usage when the service restarts or when its leadership status changes Review Comment: please document why not ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} + */ + void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; Review Comment: should this usually happen in tandem w/ `cleanUp()` or is that not always the case? if it is, combine into a singular op (even if we're not truly able to guarantee all-or-nothing atomic success) ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl()) Review Comment: suggest the method: ``` public static String MysqlUserQuotaManager::qualify(String configKey) { return MysqlUserQuotaManager.CONFIG_PREFIX + configKey } ``` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStoreTest.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * Mainly testing functionalities related to DagStateStore but not Mysql-related components. + */ +public class MostlyInMemoryDagManagementStateStoreTest { + + private DagManagementStateStore dagManagementStateStore; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + private static final String TEST_DAG_STATE_STORE = "TestDagStateStore"; + private static final String TEST_TABLE = "quotas"; + static ITestMetastoreDatabase testMetastoreDatabase; + + + @BeforeClass + public void setUp() throws Exception { + // Setting up mock DB + testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + + Config config; + ConfigBuilder configBuilder = ConfigBuilder.create(); + configBuilder.addPrimitive(MostlyInMemoryDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, TestMysqlDagStateStore.class.getName()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl()) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD) + .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE); + config = configBuilder.build(); + + // Constructing TopologySpecMap. + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + String specExecInstance = "mySpecExecutor"; + TopologySpec topologySpec = DagTestUtils.buildNaiveTopologySpec(specExecInstance); + URI specExecURI = new URI(specExecInstance); + topologySpecMap.put(specExecURI, topologySpec); + this.dagManagementStateStore = new MostlyInMemoryDagManagementStateStore(config, topologySpecMap); + } + + @Test + public void testAddDag() throws Exception { + Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("test", 12345L, "FINISH_RUNNING", true); + Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("test2", 123456L, "FINISH_RUNNING", true); + Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); + Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1); + Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0); + String dagId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); + String dagId2 = DagManagerUtils.calcJobId(dagNode3.getValue().getJobSpec().getConfig()); + + this.dagManagementStateStore.addDag(dagId, dag); + this.dagManagementStateStore.addDag(dagId, dag); Review Comment: why twice? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} Review Comment: javadoc feels unclear, esp. since it provides so little guidance on when and where to use this in the course of managing a Dag. if that's challenging to state as an abstract rule, include an example. also specify inter-method guarantees--e.g. if we just called `addDag`, does it call `writeCheckpoint` behind the scenes for us? rather than 'checkpoint', how about 'persist' or 'track'? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { Review Comment: suggestion: API might be easiest to imagine if you start w/ create+update(+delete), then follow w/ the retrieval. this shows first what data may be inserted and how manipulated, so we can later judge whether any retrieval methods may be missing or should perhaps be reworked to be atomic ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} + */ + void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; + void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; Review Comment: if these two are merely a convenience method for the two that follow, please provide a default impl (to derive the dagId from the `dag`) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); + boolean containsDag(String dagId); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); + Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); + List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; + List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; + boolean addCleanUpDag(String dagId); + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + + /** + * Persist the {@link Dag} to the backing store. + * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted + * and be picked up again when leader transition happens. + * @param dag The dag submitted to {@link DagManager} + */ + void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; + void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void cleanUp(String dagId) throws IOException; + void cleanUpFailedDag(String dagId) throws IOException; + + /** + * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager} + * takes over or on restart of service. + * @return a {@link List} of currently running {@link Dag}s. + */ + List<Dag<JobExecutionPlan>> getDags() throws IOException; + + /** + * Return a list of all dag IDs contained in the dag state store. + */ + Set<String> getDagIds() throws IOException; + Set<String> getFailedDagIds() throws IOException; + + /** + * Initialize with the provided set of dags. + */ + void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws IOException; + + /** + * Checks if the dagNode exceeds the statically configured user quota for the proxy user, requester user and flowGroup. + * It also increases the quota usage for proxy user, requester and the flowGroup of the given DagNode by one. + * @throws QuotaExceededException if the quota is exceeded + */ + void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode) throws IOException; + + /** + * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}. + * Returns true if successfully reduces the quota usage + */ + boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException; Review Comment: specify whether any relationship to `deleteDagNodeState` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +public interface DagManagementStateStore { + Dag<JobExecutionPlan> getDag(String dagId) throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + void addDag(String dagId, Dag<JobExecutionPlan> dag); Review Comment: as the `dagId` can be derived from `dag`, either specify validation and behavior for a mismatch or just change this method signature to take simply `Dag<JEP>` (and derive dagID internally). Issue Time Tracking ------------------- Worklog Id: (was: 904813) Time Spent: 20m (was: 10m) > create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, > DagStateStore and in-memory dag maps used in DagManager > ------------------------------------------------------------------------------------------------------------------------------- > > Key: GOBBLIN-2002 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2002 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > this will help in refactoring DagManager -- This message was sent by Atlassian Jira (v8.20.10#820010)