[ 
https://issues.apache.org/jira/browse/GOBBLIN-2143?focusedWorklogId=931851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931851
 ]

ASF GitHub Bot logged work on GOBBLIN-2143:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Aug/24 06:34
            Start Date: 27/Aug/24 06:34
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4038:
URL: https://github.com/apache/gobblin/pull/4038#discussion_r1732108773


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java:
##########
@@ -35,12 +35,10 @@
 public interface DagStateStoreWithDagNodes extends DagStateStore {
 
   /**
-   * Updates a dag node identified by the provided {@link DagManager.DagId}
-   * with the given {@link Dag.DagNode}.
-   * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 
0 if new dag node is same as the existing one
-   * <a 
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html";>Refer</a>
+   * Updates the {@link Dag.DagNode} with the provided value.
+   * Returns 1 if the dag node is updated successfully, 0 otherwise

Review Comment:
   why a 0/1 int rather than a boolean?  are you expecting to increase to a 
larger number eventually?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config, 
Map<URI, TopologySpec> topo
   }
 
   @Override
-  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
-      throws IOException {
-    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
-    boolean newDag = false;
-    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-      if (updateDagNode(dagId, dagNode) == 1) {
-        newDag = true;
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
 tableName), insertStatement -> {
+      int dagSize = dag.getNodes().size();
+      Object[][] data = new Object[dagSize][3];
+
+      for (int i=0; i<dagSize; i++) {
+        Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+        data[i][0] = dagNode.getValue().getId().toString();
+        data[i][1] = dagId;
+        data[i][2] 
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
       }
-    }
-    if (newDag) {
-      this.totalDagCount.inc();
-    }
+
+      for (Object[] row : data) {
+        insertStatement.setObject(1, row[0]);
+        insertStatement.setObject(2, row[1]);
+        insertStatement.setObject(3, row[2]);
+        insertStatement.addBatch();
+      }
+      try {
+        return insertStatement.executeBatch();
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure adding dag for %s", 
dagId), e);
+      }}, true);
+
+    this.totalDagCount.inc();

Review Comment:
   is this an in-memory counter?  what's the purpose?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config, 
Map<URI, TopologySpec> topo
   }
 
   @Override
-  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
-      throws IOException {
-    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
-    boolean newDag = false;
-    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-      if (updateDagNode(dagId, dagNode) == 1) {
-        newDag = true;
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
 tableName), insertStatement -> {
+      int dagSize = dag.getNodes().size();
+      Object[][] data = new Object[dagSize][3];
+
+      for (int i=0; i<dagSize; i++) {
+        Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+        data[i][0] = dagNode.getValue().getId().toString();
+        data[i][1] = dagId;
+        data[i][2] 
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));

Review Comment:
   a few of these need space on either side of the operator



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config, 
Map<URI, TopologySpec> topo
   }
 
   @Override
-  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
-      throws IOException {
-    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
-    boolean newDag = false;
-    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-      if (updateDagNode(dagId, dagNode) == 1) {
-        newDag = true;
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
 tableName), insertStatement -> {
+      int dagSize = dag.getNodes().size();
+      Object[][] data = new Object[dagSize][3];
+
+      for (int i=0; i<dagSize; i++) {
+        Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+        data[i][0] = dagNode.getValue().getId().toString();
+        data[i][1] = dagId;
+        data[i][2] 
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
       }
-    }
-    if (newDag) {
-      this.totalDagCount.inc();
-    }
+
+      for (Object[] row : data) {

Review Comment:
   why iterate through to populate data when the only thing done w/ it is to 
iterate over it here?  instead, just iterate through the `DagNode`s and 
populate `insertStatement` directly.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws 
IOException {
 
   @Override
   public void deleteDag(DagManager.DagId dagId) throws IOException {
-    this.dagStateStore.cleanUp(dagId);
-    log.info("Deleted dag {}", dagId);
+    if (this.dagStateStore.cleanUp(dagId)) {
+      log.info("Deleted dag {}", dagId);
+    } else {
+      log.info("Dag deletion was tried but did not happen {}", dagId);

Review Comment:
   only `.info` level, not `warn` or even `error`?  alternatively this could 
arguably be an exception





Issue Time Tracking
-------------------

            Worklog Id:     (was: 931851)
    Remaining Estimate: 0h
            Time Spent: 10m

> handle concurrent ReevaluateDagProc for cancelled dag nodes correctly
> ---------------------------------------------------------------------
>
>                 Key: GOBBLIN-2143
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2143
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to