[ https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=904948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904948 ]
ASF GitHub Bot logged work on GOBBLIN-2002: ------------------------------------------- Author: ASF GitHub Bot Created on: 14/Feb/24 17:25 Start Date: 14/Feb/24 17:25 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3878: URL: https://github.com/apache/gobblin/pull/3878#discussion_r1489799550 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -154,13 +152,13 @@ public synchronized void addDagNodeState(String dagId, Dag.DagNode<JobExecutionP } @Override - public Dag<JobExecutionPlan> getDag(String dagId) { - return this.dags.get(dagId); + public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) { + return this.dags.get(dagId.toString()); Review Comment: `DagId` does define equals and hash code, so itself could be a key, even w/o serializing to a string ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -32,40 +32,44 @@ * and allows add/delete and other functions. */ public interface DagManagementStateStore { - Dag<JobExecutionPlan> getDag(String dagId) throws IOException; - Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; - void addDag(String dagId, Dag<JobExecutionPlan> dag); - boolean containsDag(String dagId); - Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); - Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); - List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; - List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; - boolean addCleanUpDag(String dagId); - void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - + void addDag(Dag<JobExecutionPlan> dag); /** * Persist the {@link Dag} to the backing store. * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted * and be picked up again when leader transition happens. * @param dag The dag submitted to {@link DagManager} */ void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - + boolean containsDag(String dagId); + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + /** + * Return a list of all dag IDs contained in the dag state store. + */ + Set<String> getDagIds() throws IOException; /** * Delete the {@link Dag} from the backing store, typically upon completion of execution. - * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */ - void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; - void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; - + default void cleanUp(Dag<JobExecutionPlan> dag) throws IOException { + cleanUp(DagManagerUtils.generateDagId(dag).toString()); + } /** * Delete the {@link Dag} from the backing store, typically upon completion of execution. * @param dagId The ID of the dag to clean up. */ void cleanUp(String dagId) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + Set<String> getFailedDagIds() throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + default void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + cleanUpFailedDag(DagManagerUtils.generateDagId(dag).toString()); + } Review Comment: nit: suggest blank line before and after default impls. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -32,40 +32,44 @@ * and allows add/delete and other functions. */ public interface DagManagementStateStore { - Dag<JobExecutionPlan> getDag(String dagId) throws IOException; - Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; - void addDag(String dagId, Dag<JobExecutionPlan> dag); - boolean containsDag(String dagId); - Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); - Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); - List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; - List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; - boolean addCleanUpDag(String dagId); - void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - + void addDag(Dag<JobExecutionPlan> dag); /** * Persist the {@link Dag} to the backing store. * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted * and be picked up again when leader transition happens. * @param dag The dag submitted to {@link DagManager} */ void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - + boolean containsDag(String dagId); + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + /** + * Return a list of all dag IDs contained in the dag state store. + */ + Set<String> getDagIds() throws IOException; /** * Delete the {@link Dag} from the backing store, typically upon completion of execution. - * @param dag The dag completed/cancelled from execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. + * @param dag The dag completed/cancelled execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */ - void cleanUp(Dag<JobExecutionPlan> dag) throws IOException; - void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException; - + default void cleanUp(Dag<JobExecutionPlan> dag) throws IOException { + cleanUp(DagManagerUtils.generateDagId(dag).toString()); + } /** * Delete the {@link Dag} from the backing store, typically upon completion of execution. * @param dagId The ID of the dag to clean up. */ void cleanUp(String dagId) throws IOException; + void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + Set<String> getFailedDagIds() throws IOException; + Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; + default void cleanUpFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + cleanUpFailedDag(DagManagerUtils.generateDagId(dag).toString()); + } void cleanUpFailedDag(String dagId) throws IOException; + void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); + Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); Review Comment: let's create a `@Data class DagNodeId { ... }` as an analogue to `DagId` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -32,40 +32,44 @@ * and allows add/delete and other functions. */ public interface DagManagementStateStore { - Dag<JobExecutionPlan> getDag(String dagId) throws IOException; - Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; - void addDag(String dagId, Dag<JobExecutionPlan> dag); - boolean containsDag(String dagId); - Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); - Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); - List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; - List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; - boolean addCleanUpDag(String dagId); - void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - + void addDag(Dag<JobExecutionPlan> dag); /** * Persist the {@link Dag} to the backing store. * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted * and be picked up again when leader transition happens. * @param dag The dag submitted to {@link DagManager} */ void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - + boolean containsDag(String dagId); + Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException; + /** + * Return a list of all dag IDs contained in the dag state store. + */ + Set<String> getDagIds() throws IOException; Review Comment: another `DagId` ...plus more in methods that follow ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -74,16 +78,10 @@ public interface DagManagementStateStore { */ List<Dag<JobExecutionPlan>> getDags() throws IOException; - /** - * Return a list of all dag IDs contained in the dag state store. - */ - Set<String> getDagIds() throws IOException; - Set<String> getFailedDagIds() throws IOException; - /** * Initialize with the provided set of dags. */ - void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws IOException; + void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException; Review Comment: thinking more on this... while I'm not against having a method like this `@VisibleForTesting` or even kept around in case the need later arises (e.g. `@Experimental`), still, if it's actually used as I expect, which is: - at host startup - in conjunction w/ the list of `Dag`s retrieved from the same DMSS instance then let's explore encapsulating the entire interaction. e.g. a new method (`selfInitialize` or `autoInitialize`): ``` void uponStartup() throws IOException { initQuota(getAllDags()) } ``` (it's still fine even if the DMSS impl makes `initQuota` into a no-op.) what I'm pulling for is a clearer conceptual interface for those using this interface. so the `GobblinServiceManager` (or whoever initializes the DMSS singleton), would simply invoke: ``` this.dmss.uponStartup(); // just `start()`? ``` the way it currently does: ``` this.dagManager.setActive(true); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -32,40 +32,44 @@ * and allows add/delete and other functions. */ public interface DagManagementStateStore { - Dag<JobExecutionPlan> getDag(String dagId) throws IOException; - Dag<JobExecutionPlan> getFailedDag(String dagId) throws IOException; - void addDag(String dagId, Dag<JobExecutionPlan> dag); - boolean containsDag(String dagId); - Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId); - Dag<JobExecutionPlan> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode); - List<Dag.DagNode<JobExecutionPlan>> getDagNodes(String dagId) throws IOException; - List<Dag.DagNode<JobExecutionPlan>> getAllDagNodes() throws IOException; - boolean addCleanUpDag(String dagId); - void addDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - void deleteDagNodeState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); - + void addDag(Dag<JobExecutionPlan> dag); /** * Persist the {@link Dag} to the backing store. * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted * and be picked up again when leader transition happens. * @param dag The dag submitted to {@link DagManager} */ void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - void writeFailedDagCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; - + boolean containsDag(String dagId); Review Comment: `DagId` Issue Time Tracking ------------------- Worklog Id: (was: 904948) Time Spent: 2h 20m (was: 2h 10m) > create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, > DagStateStore and in-memory dag maps used in DagManager > ------------------------------------------------------------------------------------------------------------------------------- > > Key: GOBBLIN-2002 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2002 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > this will help in refactoring DagManager -- This message was sent by Atlassian Jira (v8.20.10#820010)