[ 
https://issues.apache.org/jira/browse/GOBBLIN-1552?focusedWorklogId=659903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659903
 ]

ASF GitHub Bot logged work on GOBBLIN-1552:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Oct/21 21:18
            Start Date: 04/Oct/21 21:18
    Worklog Time Spent: 10m 
      Work Description: phet commented on a change in pull request #3403:
URL: https://github.com/apache/gobblin/pull/3403#discussion_r716979951



##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -148,17 +148,16 @@ public static FlowExecution 
convertFlowStatus(org.apache.gobblin.service.monitor
         .setFlowGroup(monitoringFlowStatus.getFlowGroup());
 
     long flowEndTime = 0L;
-    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
     String flowMessage = "";
 
     while (jobStatusIter.hasNext()) {
       org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = 
jobStatusIter.next();
 
       // Check if this is the flow status instead of a single job status
       if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
+        // todo: flow end time will be incorrect when dag manager is not used

Review comment:
       seems a helpful note, but since you just forward whatever was in the 
`JobStatus`, it might be worth putting the 'todo' over there, where actually 
written in the incorrect/inaccurate way.

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -202,7 +203,7 @@ public static FlowExecution 
convertFlowStatus(org.apache.gobblin.service.monitor
         .setExecutionStatistics(new 
FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
             .setExecutionEndTime(flowEndTime))

Review comment:
       just checking the assumption: are we certain the end time of the 
iterator's final job status is guaranteed to be the latest (or at least 
whichever one we want for the flow execution as a whole)?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -167,13 +173,19 @@ protected final long getJobExecutionId(State jobState) {
   }
 
   protected List<FlowStatus> 
asFlowStatuses(List<FlowExecutionJobStateGrouping> flowExecutionGroupings) {
-    return flowExecutionGroupings.stream().map(exec ->
-        new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), 
exec.getFlowExecutionId(),
-            asJobStatuses(exec.getJobStates().stream().sorted(
-                // rationalized order, to facilitate test assertions
-                
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
-            ).collect(Collectors.toList()))))
-        .collect(Collectors.toList());
+    return flowExecutionGroupings.stream().map(exec -> {
+      Iterator<JobStatus> jobStatusIterator = 
asJobStatuses(exec.getJobStates().stream().sorted(
+          // rationalized order, to facilitate test assertions
+          
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+      ).collect(Collectors.toList()));
+      // duplicate copy of the iterator is required due to un re-parsing 
nature of the Iterator
+      Iterator<JobStatus> jobStatusIterator2 = 
asJobStatuses(exec.getJobStates().stream().sorted(
+          // rationalized order, to facilitate test assertions
+          
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+      ).collect(Collectors.toList()));

Review comment:
       the literal code duplication seems less clear than wrapping 
`ImmutableList.copyOf(asJobStatuses(...))` and taking two `.iterator()`s from it

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -55,12 +58,15 @@
 
   @Getter
   protected final MetricContext metricContext;
+  protected final boolean dagManagerEnabled;
 
   private final MultiContextIssueRepository issueRepository;
 
-  protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
+  protected JobStatusRetriever(Config config, MultiContextIssueRepository 
issueRepository) {
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this.issueRepository = Objects.requireNonNull(issueRepository);
+    this.dagManagerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,

Review comment:
       `Config` is both amorphous and open-ended (so we don't grasp the purpose 
or the meaning in the method signature).  since only looking up one key, why 
not have the caller perform that and keep this constructor clearer, by taking a 
param like, `boolean isDagManagerEnabled`?

##########
File path: 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
##########
@@ -218,6 +222,40 @@ public void testGetLatestExecutionIdsForFlow() throws 
Exception {
     
Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME,
 FLOW_GROUP), -1L);
   }
 
+  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+  public void testGetJobStatusesForFlowExecutionWithDagManager() throws 
Exception {

Review comment:
       do we expect the same result whether or not the DAG Manager is enabled?  
if so, should we initialize the status once in common and then have each 
JobStatusRetriever analyze the same thing (to get the same result)?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean 
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
     return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
         && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
   }
+
+  public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus> 
jobStatusIterator) {
+    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
+    if (dagManagerEnabled) {
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        // Check if this is the flow status instead of a single job status
+        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+          flowExecutionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
+        }
+      }
+    } else {
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
+          flowExecutionStatus = 
updatedFlowExecutionStatus(ExecutionStatus.valueOf(jobStatus.getEventName()), 
flowExecutionStatus);
+        }
+      }
+    }
+    return flowExecutionStatus;
+  }
+
+  static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus 
jobExecutionStatus,
+      ExecutionStatus currentFlowExecutionStatus) {
+    if (currentFlowExecutionStatus == ExecutionStatus.$UNKNOWN) {
+      return jobExecutionStatus;
+    }
+
+    // if any job failed or flow has failed then return failed status
+    if (currentFlowExecutionStatus == ExecutionStatus.FAILED ||
+        jobExecutionStatus == ExecutionStatus.FAILED) {
+      return ExecutionStatus.FAILED;
+    }
+
+    // if any job is cancelled or flow has failed then return failed status
+    if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED ||
+        jobExecutionStatus == ExecutionStatus.CANCELLED) {
+      return ExecutionStatus.CANCELLED;
+    }
+
+    if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
+        jobExecutionStatus == ExecutionStatus.PENDING) {
+      return ExecutionStatus.PENDING;

Review comment:
       I don't understand why we return the status `PENDING` only when a prior 
`COMPLETE`, but otherwise skip it when no prior `COMPLETE`.  in that case could 
we return `$UNKNOWN`? ...or is there some additional expectation of a 
well-formed status sequence, to guarantee another status would follow (which 
would be shown)?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
##########
@@ -167,27 +158,11 @@ public boolean isFlowRunning(String flowName, String 
flowGroup) {
       return false;
     } else {
       FlowStatus flowStatus = flowStatusList.get(0);
-      Iterator<JobStatus> jobStatusIterator = 
flowStatus.getJobStatusIterator();
-
-      while (jobStatusIterator.hasNext()) {
-        JobStatus jobStatus = jobStatusIterator.next();
-        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
-          return isJobRunning(jobStatus);
-        }
-      }
-      return false;
+      ExecutionStatus flowExecutionStatus = 
flowStatus.getFlowExecutionStatus();
+      return !FINISHED_STATUSES.contains(flowExecutionStatus.name());

Review comment:
       I see you couldn't still call it as presently written, but I do prefer 
the abstraction of `isJobRunning`

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean 
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
     return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
         && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
   }
+
+  public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus> 
jobStatusIterator) {
+    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
+    if (dagManagerEnabled) {
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        // Check if this is the flow status instead of a single job status
+        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+          flowExecutionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
+        }
+      }
+    } else {
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
+          flowExecutionStatus = 
updatedFlowExecutionStatus(ExecutionStatus.valueOf(jobStatus.getEventName()), 
flowExecutionStatus);
+        }
+      }
+    }
+    return flowExecutionStatus;
+  }
+
+  static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus 
jobExecutionStatus,
+      ExecutionStatus currentFlowExecutionStatus) {

Review comment:
       this logic is difficult to follow.  clearly you're rely on ordering...  
otherwise, the iterative updating in-place doesn't really illustrate the rules 
at play.
   suggestion: reorganize to analyze the sequence of job statuses as a whole.  
e.g. put them in a list, then:
   do a find for `FAILED` and `CANCELLED`; if a hit, return that status
   otherwise traverse in reverse order looking for the most salient status 
(e.g. `RUNNING`), then terminate else look-behind multiple until a status we 
don't skip... 

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean 
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
     return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
         && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
   }
+
+  public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus> 
jobStatusIterator) {

Review comment:
       may not need to be an instance method.  now it's only for 
`dagManagerEnabled`, but that state doesn't change, and could anyway be passed 
as a param (e..g to a static method).  even clearer, perhaps, would cast the 
flow status analysis/extraction in the Strategy Pattern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 659903)
    Remaining Estimate: 0h
            Time Spent: 10m

> fix flow status reporting when dag manager is not enabled
> ---------------------------------------------------------
>
>                 Key: GOBBLIN-1552
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1552
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> flow status is determined by looking at the flow level events. but flow level 
> events are not emitted outside of dag manager. so currently flow status is 
> not being determined correctly when dag manager is disabled



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to