phet commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1409753695


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java:
##########
@@ -47,6 +47,14 @@ static Map<String, String> getFlowMetadata(Config 
flowConfig) {
     return metadata;
   }
 
+  /**
+   * Retrieves a flowExecutionId from flowMetadata map. Throws NPE if 
flowExecutionId is missing to prevent proceeding

Review Comment:
   `Map.get` returns `null` when not found--it doesn't throw NPE.  that said, I 
strongly advise that whenever we do potentially return null to make the return 
type `Optional`, so the type checker would warn callers and hopefully 
**prevent** any runtime NPEs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java:
##########
@@ -47,6 +47,14 @@ static Map<String, String> getFlowMetadata(Config 
flowConfig) {
     return metadata;
   }
 
+  /**
+   * Retrieves a flowExecutionId from flowMetadata map. Throws NPE if 
flowExecutionId is missing to prevent proceeding

Review Comment:
   `Map.get` returns `null` when not found: it doesn't throw NPE.  that said, I 
strongly advise that whenever we do potentially return null to make the return 
type `Optional`, so the type checker can warn callers and hopefully **prevent** 
runtime NPEs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -312,17 +315,25 @@ public synchronized void addDag(Dag<JobExecutionPlan> 
dag, boolean persist, bool
       log.warn("Skipping add dag because this instance of DagManager is not 
active for dag: {}", dag);
       return;
     }
+
+    DagId dagId = DagManagerUtils.generateDagId(dag);
     if (persist) {
-      //Persist the dag
+      // Persist the dag
       this.dagStateStore.writeCheckpoint(dag);
+      // After persisting the dag, its status will be tracked by active 
dagManagers so the action should be deleted
+      // to avoid duplicate executions upon leadership change
+      if (this.dagActionStore.isPresent()) {
+        this.dagActionStore.get().deleteDagAction(new 
DagActionStore.DagAction(dagId.getFlowGroup(),
+            dagId.getFlowName(), dagId.getFlowExecutionId(), 
DagActionStore.FlowActionType.LAUNCH));

Review Comment:
   suggest adding a method to `DagId` (which also should be simplified as 
`@Data`):
   ```
   DagAction toDagAction(FlowActionType t) {
     ...
   }
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java:
##########
@@ -346,6 +349,9 @@ class MockedDagManager extends DagManager {
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
     super(config, createJobStatusRetriever(), 
Mockito.mock(SharedFlowMetricsSingleton.class),
         Mockito.mock(FlowStatusGenerator.class), 
Mockito.mock(FlowCatalog.class), instrumentationEnabled);
+//    this.dagActionStore = 
Optional.of(Mockito.mock(MysqlDagActionStore.class)); 
//Mockito.mock(DagActionStore.class));

Review Comment:
   remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to