phet commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631651319


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -82,41 +86,66 @@ public void tearDown() throws Exception {
   }
 
   @Test
-  public void launchDag()
-      throws IOException, InterruptedException, URISyntaxException {
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
-        5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg")));
+  public void launchDag() throws IOException, InterruptedException, 
URISyntaxException, ExecutionException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    String flowExecutionId = "12345";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
Long.parseLong(flowExecutionId),
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));

Review Comment:
   to dot-I's and cross-T's I'd suggest to first assert there's only one 
specProducer



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,37 @@
  */
 @Slf4j
 public class DagProcUtils {
+
+  /**
+   * If there is a single job to run next, it runs it. If there are multiple 
jobs to run, it creates a
+   * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
 dag action for
+   * each of them and those jobs will be launched in respective Reevaluate dag 
proc.

Review Comment:
   `{@link ReevaluateDagProc}`



##########
gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java:
##########
@@ -97,7 +97,7 @@ class TestMetastoreDatabaseServer implements Closeable {
         .withUser(this.dbUserName, this.dbUserPassword)
         .withServerVariable("explicit_defaults_for_timestamp", "off")
         // default `max_connections` is apparently 151 - see: 
https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_connections
-        .withServerVariable("max_connections", "501")
+        .withServerVariable("max_connections", "2000")

Review Comment:
   hopefully this helps!  ... I didn't have a lot of luck setting this myself 
(to avoid the "too many conns" unit test failures elsewhere)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -141,4 +170,14 @@ public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(S
     }
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
+
+  public static List<SpecProducer<Spec>> 
getDagSpecProducers(Dag<JobExecutionPlan> dag) {

Review Comment:
   interesting this is only called from `ReevaluateDagProcTest`.  does it 
belong there?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,37 @@
  */
 @Slf4j
 public class DagProcUtils {
+
+  /**
+   * If there is a single job to run next, it runs it. If there are multiple 
jobs to run, it creates a
+   * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
 dag action for
+   * each of them and those jobs will be launched in respective Reevaluate dag 
proc.
+   */
+  public static void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagManager.DagId dagId) throws IOException {
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+    if (nextNodes.size() > 1) {
+      handleMultipleJobs(dagManagementStateStore, nextNodes);
+      return;
+    }
+
+    //Submit jobs from the dag ready for execution.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
dagId);
+      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), dagId);
+    }

Review Comment:
   what's your take?  IMO, this feels clear and direct:
   ```
   if (nextNodes.size() == 1) {
     submitJob(dmss, nextNodes.get(0), dagId)
     log.info(...);
   } else {
     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
       dmss.addJobDagAction(...);
     }
   }
   ```
   
   so only one loop, when it's actually needed, and no separate `private 
static` method merely to perform looping and call only one method



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -90,9 +70,28 @@ protected void act(DagManagementStateStore 
dagManagementStateStore, Pair<Optiona
     }
 
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
-    JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
-    ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+
+    if (!dagNodeWithJobStatus.getRight().isPresent()) {
+      // Usually reevaluate dag action is created by JobStatusMonitor when a 
finished job status is available,
+      // but when reevaluate/resume/launch dag proc found multiple parallel 
jobs to run next, it creates reevaluate
+      // dag actions for each of those parallel job and in this scenario there 
is no job status available.
+      // If the job status is not present, this job was never launched, submit 
it now.
+      submitJobForThisDagNode(dagManagementStateStore, dagNode);
+      return;
+    }
+
+    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
 {
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
+              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
+          dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(), 
FlowStatusGenerator.FINISHED_STATUSES));
+    }
+
     Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());

Review Comment:
   probably these should be introduced before L83



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -123,6 +122,11 @@ protected void act(DagManagementStateStore 
dagManagementStateStore, Pair<Optiona
     }
   }
 
+  private void submitJobForThisDagNode(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+    DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
+    log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
+  }

Review Comment:
   logging like this could arguably go in `DagProcUtils.submitJobToExecutor`.  
if so this class could call that directly, rather than wrapping the call in 
this method that merely forwards



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<JobStatus>> ini
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
         dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
 
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
+    if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
       // already "reevaluated" and cleaned up.
       return ImmutablePair.of(Optional.empty(), Optional.empty());
     }
 
-    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
+    if (dagNodeWithJobStatus.getRight().isPresent()) {

Review Comment:
   in the multi-job case of reevaluate wouldn't there be job status present?  
isn't it only in the multi-job launch and resume cases?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +192,80 @@ public void testNoNextJobToRun() throws Exception {
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
+
+  @Test
+  public void testCurrentJobToRun() throws Exception {
+    String flowName = "fn3";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+    List<SpecProducer<Spec>> specProducers = 
LaunchDagProcTest.getDagSpecProducers(dag);
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)), 
Optional.empty()))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null,
+        dagManagementStateStore));
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    long addSpecCount = specProducers.stream()
+        .mapToLong(p -> Mockito.mockingDetails(p)
+            .getInvocations()
+            .stream()
+            .filter(a -> a.getMethod().getName().equals("addSpec"))
+            .count())
+        .sum();
+
+    int numOfLaunchedJobs = 1; // only the current job
+    // only the current job should have run
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+    Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+
+    // no job's state is deleted because that happens when the job finishes 
triggered the reevaluate dag proc
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagNodeState(any(), any());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
+    Mockito.verify(dagActionReminderScheduler, 
Mockito.never()).unscheduleReminderJob(any());
+  }
+
+  @Test
+  public void testMultipleNextJobToRun() throws Exception {
+    String flowName = "fn4";
+    Dag<JobExecutionPlan> dag = 
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1", 
String.valueOf(flowExecutionId),
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job3").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    // mocked job status for the first four jobs
+    
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job3", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+    // process 4th job
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th 
job passes, i.e. 5th and 6th job
+    // parallel jobs are launched through reevaluate dag action
+    Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))

Review Comment:
   let's validate specProducers here too.  given this is a repeating trope of 
counting `DagAction`s emitted and specProducer `addSpec` calls, consider 
developing an abstraction around verifying the "side-effects" of 
`DagProc.process`ing



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -134,6 +154,7 @@ private void setStatus(DagManagementStateStore 
dagManagementStateStore,
     for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
       if (node.getValue().getId().equals(dagNodeId)) {
         node.getValue().setExecutionStatus(executionStatus);
+        dagManagementStateStore.addDagNodeState(node, getDagId());

Review Comment:
   good insight.  where do you want to document as a code-level `TODO`?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -94,29 +93,50 @@ public void launchDag()
         flowCompilationValidationHelper);
 
     launchDagProc.process(this.dagManagementStateStore);
-    int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
-    Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+    int numOfLaunchedJobs = 1; // = number of start nodes
+    Assert.assertEquals(numOfLaunchedJobs,
         
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
             .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
 
-    Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addFlowDagAction(any(), any(), any(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
+  @Test
+  public void launchDagWithMultipleParallelJobs() throws IOException, 
InterruptedException, URISyntaxException {

Review Comment:
   I know that.  the reason I'm urging you to add it is that specProducer 
validation (that never called for all of them) would have caught the bug 
[here](https://github.com/apache/gobblin/pull/3965#discussion_r1631547273)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore 
dagManagementStateStore, Pair<Optiona
       return;
     }
 
+    if (dagNodeWithJobStatus.getRight().isPresent()
+        && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
 {
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
+              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
+          dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(), 
FlowStatusGenerator.FINISHED_STATUSES));
+    }
+
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
+
+    if (!dagNodeWithJobStatus.getRight().isPresent()) {
+      // if the job status is not present, this job was never launched, submit 
it now
+      submitJobForThisDagNode(dagManagementStateStore, dagNode);
+      return;
+    }

Review Comment:
   OK, that sounds reasonable.  let's document the crux of this very clearly 
stated rationale somewhere in source code... perhaps in this new `DagProcUtils` 
method?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -57,25 +55,7 @@ public ReevaluateDagProc(ReevaluateDagTask 
reEvaluateDagTask) {
   @Override
   protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
initialize(DagManagementStateStore dagManagementStateStore)
       throws IOException {
-    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
-        dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
-
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
-      // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
-      // already "reevaluated" and cleaned up.
-      return ImmutablePair.of(Optional.empty(), Optional.empty());
-    }

Review Comment:
   don't we still need this check for `if (!dnwjs.getLeft().isPresent())` (to 
return empty)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to