[
https://issues.apache.org/jira/browse/GOBBLIN-1552?focusedWorklogId=659903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659903
]
ASF GitHub Bot logged work on GOBBLIN-1552:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Oct/21 21:18
Start Date: 04/Oct/21 21:18
Worklog Time Spent: 10m
Work Description: phet commented on a change in pull request #3403:
URL: https://github.com/apache/gobblin/pull/3403#discussion_r716979951
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -148,17 +148,16 @@ public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitor
.setFlowGroup(monitoringFlowStatus.getFlowGroup());
long flowEndTime = 0L;
- ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
String flowMessage = "";
while (jobStatusIter.hasNext()) {
org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus =
jobStatusIter.next();
// Check if this is the flow status instead of a single job status
if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
+ // todo: flow end time will be incorrect when dag manager is not used
Review comment:
seems a helpful note, but since you just forward whatever was in the
`JobStatus`, it might be worth putting the 'todo' over there, where actually
written in the incorrect/inaccurate way.
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -202,7 +203,7 @@ public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitor
.setExecutionStatistics(new
FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
.setExecutionEndTime(flowEndTime))
Review comment:
just checking the assumption: are we certain the end time of the
iterator's final job status is guaranteed to be the latest (or at least
whichever one we want for the flow execution as a whole)?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -167,13 +173,19 @@ protected final long getJobExecutionId(State jobState) {
}
protected List<FlowStatus>
asFlowStatuses(List<FlowExecutionJobStateGrouping> flowExecutionGroupings) {
- return flowExecutionGroupings.stream().map(exec ->
- new FlowStatus(exec.getFlowName(), exec.getFlowGroup(),
exec.getFlowExecutionId(),
- asJobStatuses(exec.getJobStates().stream().sorted(
- // rationalized order, to facilitate test assertions
-
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
- ).collect(Collectors.toList()))))
- .collect(Collectors.toList());
+ return flowExecutionGroupings.stream().map(exec -> {
+ Iterator<JobStatus> jobStatusIterator =
asJobStatuses(exec.getJobStates().stream().sorted(
+ // rationalized order, to facilitate test assertions
+
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+ ).collect(Collectors.toList()));
+ // duplicate copy of the iterator is required due to un re-parsing
nature of the Iterator
+ Iterator<JobStatus> jobStatusIterator2 =
asJobStatuses(exec.getJobStates().stream().sorted(
+ // rationalized order, to facilitate test assertions
+
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+ ).collect(Collectors.toList()));
Review comment:
the literal code duplication seems less clear than wrapping
`ImmutableList.copyOf(asJobStatuses(...))` and taking two `.iterator()`s from it
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -55,12 +58,15 @@
@Getter
protected final MetricContext metricContext;
+ protected final boolean dagManagerEnabled;
private final MultiContextIssueRepository issueRepository;
- protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
+ protected JobStatusRetriever(Config config, MultiContextIssueRepository
issueRepository) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.issueRepository = Objects.requireNonNull(issueRepository);
+ this.dagManagerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
Review comment:
`Config` is both amorphous and open-ended (so we don't grasp the purpose
or the meaning in the method signature). since only looking up one key, why
not have the caller perform that and keep this constructor clearer, by taking a
param like, `boolean isDagManagerEnabled`?
##########
File path:
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
##########
@@ -218,6 +222,40 @@ public void testGetLatestExecutionIdsForFlow() throws
Exception {
Assert.assertEquals(this.jobStatusRetriever.getLatestExecutionIdForFlow(FLOW_NAME,
FLOW_GROUP), -1L);
}
+ @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+ public void testGetJobStatusesForFlowExecutionWithDagManager() throws
Exception {
Review comment:
do we expect the same result whether or not the DAG Manager is enabled?
if so, should we initialize the status once in common and then have each
JobStatusRetriever analyze the same thing (to get the same result)?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
}
+
+ public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus>
jobStatusIterator) {
+ ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
+ if (dagManagerEnabled) {
+ while (jobStatusIterator.hasNext()) {
+ JobStatus jobStatus = jobStatusIterator.next();
+ // Check if this is the flow status instead of a single job status
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+ flowExecutionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
+ }
+ }
+ } else {
+ while (jobStatusIterator.hasNext()) {
+ JobStatus jobStatus = jobStatusIterator.next();
+ if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
+ flowExecutionStatus =
updatedFlowExecutionStatus(ExecutionStatus.valueOf(jobStatus.getEventName()),
flowExecutionStatus);
+ }
+ }
+ }
+ return flowExecutionStatus;
+ }
+
+ static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus
jobExecutionStatus,
+ ExecutionStatus currentFlowExecutionStatus) {
+ if (currentFlowExecutionStatus == ExecutionStatus.$UNKNOWN) {
+ return jobExecutionStatus;
+ }
+
+ // if any job failed or flow has failed then return failed status
+ if (currentFlowExecutionStatus == ExecutionStatus.FAILED ||
+ jobExecutionStatus == ExecutionStatus.FAILED) {
+ return ExecutionStatus.FAILED;
+ }
+
+ // if any job is cancelled or flow has failed then return failed status
+ if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED ||
+ jobExecutionStatus == ExecutionStatus.CANCELLED) {
+ return ExecutionStatus.CANCELLED;
+ }
+
+ if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
+ jobExecutionStatus == ExecutionStatus.PENDING) {
+ return ExecutionStatus.PENDING;
Review comment:
I don't understand why we return the status `PENDING` only when a prior
`COMPLETE`, but otherwise skip it when no prior `COMPLETE`. in that case could
we return `$UNKNOWN`? ...or is there some additional expectation of a
well-formed status sequence, to guarantee another status would follow (which
would be shown)?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
##########
@@ -167,27 +158,11 @@ public boolean isFlowRunning(String flowName, String
flowGroup) {
return false;
} else {
FlowStatus flowStatus = flowStatusList.get(0);
- Iterator<JobStatus> jobStatusIterator =
flowStatus.getJobStatusIterator();
-
- while (jobStatusIterator.hasNext()) {
- JobStatus jobStatus = jobStatusIterator.next();
- if (JobStatusRetriever.isFlowStatus(jobStatus)) {
- return isJobRunning(jobStatus);
- }
- }
- return false;
+ ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
Review comment:
I see you couldn't still call it as presently written, but I do prefer
the abstraction of `isJobRunning`
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
}
+
+ public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus>
jobStatusIterator) {
+ ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
+
+ if (dagManagerEnabled) {
+ while (jobStatusIterator.hasNext()) {
+ JobStatus jobStatus = jobStatusIterator.next();
+ // Check if this is the flow status instead of a single job status
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+ flowExecutionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
+ }
+ }
+ } else {
+ while (jobStatusIterator.hasNext()) {
+ JobStatus jobStatus = jobStatusIterator.next();
+ if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
+ flowExecutionStatus =
updatedFlowExecutionStatus(ExecutionStatus.valueOf(jobStatus.getEventName()),
flowExecutionStatus);
+ }
+ }
+ }
+ return flowExecutionStatus;
+ }
+
+ static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus
jobExecutionStatus,
+ ExecutionStatus currentFlowExecutionStatus) {
Review comment:
this logic is difficult to follow. clearly you're rely on ordering...
otherwise, the iterative updating in-place doesn't really illustrate the rules
at play.
suggestion: reorganize to analyze the sequence of job statuses as a whole.
e.g. put them in a list, then:
do a find for `FAILED` and `CANCELLED`; if a hit, return that status
otherwise traverse in reverse order looking for the most salient status
(e.g. `RUNNING`), then terminate else look-behind multiple until a status we
don't skip...
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -212,4 +224,60 @@ public static boolean
isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
}
+
+ public ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus>
jobStatusIterator) {
Review comment:
may not need to be an instance method. now it's only for
`dagManagerEnabled`, but that state doesn't change, and could anyway be passed
as a param (e..g to a static method). even clearer, perhaps, would cast the
flow status analysis/extraction in the Strategy Pattern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 659903)
Remaining Estimate: 0h
Time Spent: 10m
> fix flow status reporting when dag manager is not enabled
> ---------------------------------------------------------
>
> Key: GOBBLIN-1552
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1552
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> flow status is determined by looking at the flow level events. but flow level
> events are not emitted outside of dag manager. so currently flow status is
> not being determined correctly when dag manager is disabled
--
This message was sent by Atlassian Jira
(v8.3.4#803005)