[ https://issues.apache.org/jira/browse/GOBBLIN-2137?focusedWorklogId=936489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-936489 ]
ASF GitHub Bot logged work on GOBBLIN-2137: ------------------------------------------- Author: ASF GitHub Bot Created on: 25/Sep/24 22:54 Start Date: 25/Sep/24 22:54 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4032: URL: https://github.com/apache/gobblin/pull/4032#discussion_r1776056489 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -77,22 +77,11 @@ public interface DagManagementStateStore { /** * This marks the dag as a failed one. - * Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried. + * Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried. Review Comment: does it remain useful to both retrieve the DAG while also asserting that it's failed? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java: ########## @@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException { @Override public List<Dag<JobExecutionPlan>> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine");} Review Comment: actually, doesn't this need a newline before `}`? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java: ########## @@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws IOException { log.info("Deleted dag {}", dagId); } - @Override - public void deleteFailedDag(DagManager.DagId dagId) throws IOException { - this.failedDagStateStore.cleanUp(dagId); - log.info("Deleted failed dag {}", dagId); - } - - @Override - public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws IOException { - return Optional.of(this.failedDagStateStore.getDag(dagId)); - } - @Override public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed. Review Comment: nit: space before starting a comment. also more brevity; e.g.: ``` // create all DagNodes as isFailedDag == false ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java: ########## @@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User private synchronized void start() { if (!dagStoresInitialized) { this.dagStateStore = createDagStateStore(config, topologySpecMap); - this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), Review Comment: any ideas on handling migration when we roll this out (presuming the failed DagStateStore was not empty)? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java: ########## @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { Review Comment: given arjun just wrote this class a month or two back, please ensure your auto-formatting is what it's supposed to be. it is possible his was off, but let's check. sure we might fix spelling errors, but there should be little reason to reformat files we've only just created ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java: ########## @@ -137,4 +152,56 @@ public void testAddGetAndDeleteDag() throws Exception{ Assert.assertNull(this.dagStateStore.getDag(dagId1)); Assert.assertNull(this.dagStateStore.getDag(dagId2)); } + + @Test + public void testMarkDagAsFailed() throws Exception { + // Set up initial conditions + Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + + this.dagStateStore.writeCheckpoint(dag); + + // Fetch all initial states into a list + List<Boolean> initialStates = fetchDagNodeStates(dagId.toString()); + + // Check Initial State + for (Boolean state : initialStates) { + Assert.assertFalse(state); + } + // Set the DAG as failed + dag.setFailedDag(true); + this.dagStateStore.writeCheckpoint(dag); + + // Fetch all states after marking the DAG as failed + List<Boolean> failedStates = fetchDagNodeStates(dagId.toString()); + + // Check if all states are now true (indicating failure) + for (Boolean state : failedStates) { + Assert.assertTrue(state); + } + dagStateStore.cleanUp(dagId); + Assert.assertNull(this.dagStateStore.getDag(dagId)); + } + + private List<Boolean> fetchDagNodeStates(String dagId) throws IOException { + List<Boolean> states = new ArrayList<>(); + + dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { Review Comment: is behind-the-scenes DB access the only way to validate behavior here? is there no way to access from the "official" DagStateStore, then mark failed and finally re-access from the DSS to verify all nodes have changed? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java: ########## @@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) { @Override protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { - return dagManagementStateStore.getFailedDag(getDagId()); + return dagManagementStateStore.getDag(getDagId()); Review Comment: verify it's actually failed? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java: ########## @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a> */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException; Review Comment: wondering on semantics then. here we have a method to update a `DagNode`, but it takes a flag about whether its parent, the DAG is failed. does `true` here mean, in the course of updating the `DagNode`, to mark the entire DAG failed? (I can imagine that faillure in one of the nodes would render the entire DAG failed.) that said, what if there are multiple concurrent jobs, each corresponding to a `DagNode`, and they all complete about the same time, with several succeeding, but one failing? is there a race-condition where unless the failed `DagNode` is processed last, that a successful `DagNode` might get processed just after the failure and re-set the DAG back to NOT failed? Issue Time Tracking ------------------- Worklog Id: (was: 936489) Time Spent: 2h 10m (was: 2h) > Merge Failed Node Dag State Store and Dag Node State Store > ---------------------------------------------------------- > > Key: GOBBLIN-2137 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2137 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Aditya Pratap Singh > Priority: Minor > Time Spent: 2h 10m > Remaining Estimate: 0h > > Merge Failed Node Dag State Store and Dag Node State Store -- This message was sent by Atlassian Jira (v8.20.10#820010)