[ 
https://issues.apache.org/jira/browse/GOBBLIN-2017?focusedWorklogId=922614&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-922614
 ]

ASF GitHub Bot logged work on GOBBLIN-2017:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/24 17:56
            Start Date: 07/Jun/24 17:56
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631510375


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<JobStatus>> ini
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
         dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
 
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
+    if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
       // already "reevaluated" and cleaned up.
       return ImmutablePair.of(Optional.empty(), Optional.empty());
     }
 
-    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
+    if (dagNodeWithJobStatus.getRight().isPresent()) {
+      ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
+      if 
(FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+        setStatus(dagManagementStateStore, 
dagNodeWithJobStatus.getLeft().get(), executionStatus);

Review Comment:
   seems better to perform this in `act`, so it's not spread out between two 
methods



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -66,7 +66,7 @@
  * the flow action using {@link DagActionReminderScheduler} to reattempt the 
lease after the current leaseholder's grant
  * would have expired. The {@link DagActionReminderScheduler} is used in the 
non multi-active execution configuration as
  * well to utilize reminders for a single {@link DagManagementTaskStreamImpl} 
case as well.
- * Note that if multi-active execution is NOT enabled, then all flow action 
events are selected by
+ * Note that if multi-active execution is NOT enabsled, then all flow action 
events are selected by

Review Comment:
   misspelling



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<JobStatus>> ini
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
         dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
 
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
+    if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
       // already "reevaluated" and cleaned up.
       return ImmutablePair.of(Optional.empty(), Optional.empty());
     }
 
-    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
+    if (dagNodeWithJobStatus.getRight().isPresent()) {

Review Comment:
   somewhere let's add a comment describing circumstance of a dag node w/o job 
status.  where should that live?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +207,84 @@ public void testNoNextJobToRun() throws Exception {
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
+
+  @Test
+  public void testCurrentJobToRun() throws Exception {
+    String flowName = "fn3";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+      try {
+        return DagManagerUtils.getSpecProducer(n);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)), 
Optional.empty()))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null,
+        dagManagementStateStore));
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    // only the current job should have run
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+    Mockito.verify(specProducers.get(1), Mockito.never()).addSpec(any());

Review Comment:
   being a list, shouldn't you verify that *all* elems after `0` are not called?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -94,29 +93,50 @@ public void launchDag()
         flowCompilationValidationHelper);
 
     launchDagProc.process(this.dagManagementStateStore);
-    int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
-    Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+    int numOfLaunchedJobs = 1; // = number of start nodes
+    Assert.assertEquals(numOfLaunchedJobs,
         
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
             .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
 
-    Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addFlowDagAction(any(), any(), any(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
+  @Test
+  public void launchDagWithMultipleParallelJobs() throws IOException, 
InterruptedException, URISyntaxException {

Review Comment:
   I see validation of spec executors in `ReevaluateDagProcTest`, but not 
here...  shouldn't this also?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore 
dagManagementStateStore, Pair<Optiona
       return;
     }
 
+    if (dagNodeWithJobStatus.getRight().isPresent()
+        && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
 {
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
+              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
+          dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(), 
FlowStatusGenerator.FINISHED_STATUSES));
+    }
+
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
+
+    if (!dagNodeWithJobStatus.getRight().isPresent()) {
+      // if the job status is not present, this job was never launched, submit 
it now
+      submitJobForThisDagNode(dagManagementStateStore, dagNode);
+      return;
+    }

Review Comment:
   it may be too subtle (and even potentially mask errors) to designate that a 
REEVALUATE DagAction with no job status actually means to launch the job.
   
   directly recursive handling would decompose multi-job LAUNCH into multiple 
LAUNCH DagActions and multi-job REEVALUATE into multiple REEVALUATE DagActions. 
 why not transparently handle the multi-job cases in this way?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +207,84 @@ public void testNoNextJobToRun() throws Exception {
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
+
+  @Test
+  public void testCurrentJobToRun() throws Exception {
+    String flowName = "fn3";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+      try {
+        return DagManagerUtils.getSpecProducer(n);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());

Review Comment:
   nit: abstract this -
   ```
   List<SpecProducer<Spec>> getDagSpecProducers(Dag<JEP> dag)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,36 @@
  */
 @Slf4j
 public class DagProcUtils {
+
+  /**
+   * If there is a single job to run next, it runs it. If there are multiple 
jobs to run, it creates a
+   * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
 dag action for
+   * each of them and those jobs will be launched in respective Reevaluate dag 
proc.
+   */
+  public static void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagManager.DagId dagId) throws IOException {
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+    if (nextNodes.size() > 1) {
+      handleMultipleJobs(dagManagementStateStore, nextNodes);
+    }
+
+    //Submit jobs from the dag ready for execution.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
dagId);
+      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), dagId);
+    }

Review Comment:
   I'm confused... does the multi-job case both add a DagAction and also submit 
each job to an executor?  I didn't expect submitting to the executor until the 
(deferred) DagAction is handled later.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -134,6 +154,7 @@ private void setStatus(DagManagementStateStore 
dagManagementStateStore,
     for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
       if (node.getValue().getId().equals(dagNodeId)) {
         node.getValue().setExecutionStatus(executionStatus);
+        dagManagementStateStore.addDagNodeState(node, getDagId());

Review Comment:
   was this merely omitted previously or is there only now a need for it, due 
to asking `ReevaluateDagProc` to do double-duty of launching remaining 
multi-jobs in addition to regular reevaluation of completed jobs?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 922614)
    Remaining Estimate: 0h
            Time Spent: 10m

> divide multiple job launches in a LaunchDagProc into multiple LaunchDagActions
> ------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2017
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2017
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> divide multiple job launches in a LaunchDagProc into multiple 
> LaunchDagActions for two reasons
> 1) it will then spend less time in each dag proc processing and have more 
> chance to complete the operation in the lease time
> 2) handling partial job submissions in one LaunchDagProc sending N jobs is 
> difficult



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to