[ https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905396&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905396 ]
ASF GitHub Bot logged work on GOBBLIN-2002: ------------------------------------------- Author: ASF GitHub Bot Created on: 16/Feb/24 18:37 Start Date: 16/Feb/24 18:37 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3878: URL: https://github.com/apache/gobblin/pull/3878#discussion_r1492824270 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states + * and allows add/delete and other functions. + */ +@Alpha +public interface DagManagementStateStore { + /** + * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s. + * e.g. on adding a failed dag in store to retry later, on submitting a dag node to spec producer because that changes + * dag node's state, on resuming a dag, on receiving a new dag from orchestrator. + * Opposite of this is {@link DagManagementStateStore#deleteDag} that removes the Dag from the store. + * @param dag The dag to checkpoint + */ + void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Returns true if the dag is present in the store. + * @param dagId DagId of the dag + */ + boolean containsDag(DagManager.DagId dagId) throws IOException; + + /** + * Returns a dag if present, null otherwise. + * @param dagId DagId of the dag + */ + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically called upon completion of execution. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteDag(DagManagerUtils.generateDagId(dag)); + } + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dagId The ID of the dag to clean up. + */ + void deleteDag(DagManager.DagId dagId) throws IOException; + + /** + * This marks the dag as a failed one. + * Failed dags are queried using {@link DagManagementStateStore#getFailedDagIds()} later to be retried. + * @param dag failing dag + * @throws IOException + */ + void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Return a list of all failed dags' IDs contained in the dag state store. + */ + Set<String> getFailedDagIds() throws IOException; + + /** + * Returns the failed dag. + * If the dag is not found or is not marked as failed through {@link DagManagementStateStore#markDagFailed(Dag)}, it returns null. + * @param dagId dag id of the failed dag + */ + Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Deletes the failed dag. No-op if dag is not found or is not marked as failed. + * @param dag + * @throws IOException + */ + default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + deleteFailedDag(DagManagerUtils.generateDagId(dag)); + } + + void deleteFailedDag(DagManager.DagId dagId) throws IOException; + + /** + * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. + * Note that a DagNode is a part of a Dag and may already be present in the store thorugh Review Comment: actually I was asking about this in the unit test: why "may already be present" vs. "must already be present"? please weigh the pros/cons so we're clear we've made the right choice Issue Time Tracking ------------------- Worklog Id: (was: 905396) Time Spent: 5h 50m (was: 5h 40m) > create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, > DagStateStore and in-memory dag maps used in DagManager > ------------------------------------------------------------------------------------------------------------------------------- > > Key: GOBBLIN-2002 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2002 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > this will help in refactoring DagManager -- This message was sent by Atlassian Jira (v8.20.10#820010)