[ 
https://issues.apache.org/jira/browse/GOBBLIN-2137?focusedWorklogId=936489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-936489
 ]

ASF GitHub Bot logged work on GOBBLIN-2137:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Sep/24 22:54
            Start Date: 25/Sep/24 22:54
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4032:
URL: https://github.com/apache/gobblin/pull/4032#discussion_r1776056489


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -77,22 +77,11 @@ public interface DagManagementStateStore {
 
   /**
    * This marks the dag as a failed one.
-   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried.

Review Comment:
   does it remain useful to both retrieve the DAG while also asserting that 
it's failed?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException {
   @Override
   public List<Dag<JobExecutionPlan>> getDags() throws IOException {
     throw new NotSupportedException(getClass().getSimpleName() + " does not 
need this legacy API that originated with "
-        + "the DagManager that is replaced by DagProcessingEngine");  }
+        + "the DagManager that is replaced by DagProcessingEngine");}

Review Comment:
   actually, doesn't this need a newline before `}`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws 
IOException {
     log.info("Deleted dag {}", dagId);
   }
 
-  @Override
-  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
-    this.failedDagStateStore.cleanUp(dagId);
-    log.info("Deleted failed dag {}", dagId);
-  }
-
-  @Override
-  public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
-    return Optional.of(this.failedDagStateStore.getDag(dagId));
-  }
-
   @Override
   public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
       throws IOException {
-    this.dagStateStore.updateDagNode(dagId, dagNode);
+    this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is 
set as false because addDagNodeState adds a new DagNode, doesn't update an 
existing dagNode as failed.

Review Comment:
   nit: space before starting a comment.  also more brevity; e.g.:
   ```
   // create all DagNodes as isFailedDag == false
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, 
FlowCatalog flowCatalog, User
   private synchronized void start() {
     if (!dagStoresInitialized) {
       this.dagStateStore = createDagStateStore(config, topologySpecMap);
-      this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),

Review Comment:
   any ideas on handling migration when we roll this out (presuming the failed 
DagStateStore was not empty)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, 
Map<URI, TopologySpec> topo
     DataSource dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
 
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+        PreparedStatement createStatement = connection.prepareStatement(
+            String.format(CREATE_TABLE_STATEMENT, tableName))) {

Review Comment:
   given arjun just wrote this class a month or two back, please ensure your 
auto-formatting is what it's supposed to be.  it is possible his was off, but 
let's check.  sure we might fix spelling errors, but there should be little 
reason to reformat files we've only just created



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java:
##########
@@ -137,4 +152,56 @@ public void testAddGetAndDeleteDag() throws Exception{
     Assert.assertNull(this.dagStateStore.getDag(dagId1));
     Assert.assertNull(this.dagStateStore.getDag(dagId2));
   }
+
+  @Test
+  public void testMarkDagAsFailed() throws Exception {
+    // Set up initial conditions
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+
+    this.dagStateStore.writeCheckpoint(dag);
+
+    // Fetch all initial states into a list
+    List<Boolean> initialStates = fetchDagNodeStates(dagId.toString());
+
+    // Check Initial State
+    for (Boolean state : initialStates) {
+      Assert.assertFalse(state);
+    }
+    // Set the DAG as failed
+    dag.setFailedDag(true);
+    this.dagStateStore.writeCheckpoint(dag);
+
+    // Fetch all states after marking the DAG as failed
+    List<Boolean> failedStates = fetchDagNodeStates(dagId.toString());
+
+    // Check if all states are now true (indicating failure)
+    for (Boolean state : failedStates) {
+      Assert.assertTrue(state);
+    }
+    dagStateStore.cleanUp(dagId);
+    Assert.assertNull(this.dagStateStore.getDag(dagId));
+  }
+
+  private List<Boolean> fetchDagNodeStates(String dagId) throws IOException {
+    List<Boolean> states = new ArrayList<>();
+
+    
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT,
 tableName), getStatement -> {

Review Comment:
   is behind-the-scenes DB access the only way to validate behavior here?  is 
there no way to access from the "official" DagStateStore, then mark failed and 
finally re-access from the DSS to verify all nodes have changed?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java:
##########
@@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config 
config) {
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-      return dagManagementStateStore.getFailedDag(getDagId());
+      return dagManagementStateStore.getDag(getDagId());

Review Comment:
   verify it's actually failed?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java:
##########
@@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends 
DagStateStore {
    * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 
0 if new dag node is same as the existing one
    * <a 
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html";>Refer</a>
    */
-  int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException;
+  int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> 
dagNode, boolean isFailedDag) throws IOException;

Review Comment:
   wondering on semantics then.  here we have a method to update a `DagNode`, 
but it takes a flag about whether its parent, the DAG is failed.  does `true` 
here mean, in the course of updating the `DagNode`, to mark the entire DAG 
failed?  (I can imagine that faillure in one of the nodes would render the 
entire DAG failed.)
   
   that said, what if there are multiple concurrent jobs, each corresponding to 
a `DagNode`, and they all complete about the same time, with several 
succeeding, but one failing?  is there a race-condition where unless the failed 
`DagNode` is processed last, that a successful `DagNode` might get processed 
just after the failure and re-set the DAG back to NOT failed?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 936489)
    Time Spent: 2h 10m  (was: 2h)

> Merge Failed Node Dag State Store and Dag Node State Store
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-2137
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2137
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Aditya Pratap Singh
>            Priority: Minor
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Merge Failed Node Dag State Store and Dag Node State Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to