phet commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631514247
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
return;
}
+ if (dagNodeWithJobStatus.getRight().isPresent()
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // if the job status is not present, this job was never launched, submit
it now
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
Review Comment:
it may be too subtle (and even potentially mask errors) to designate that a
REEVALUATE DagAction with no job status actually happens to encode the need to
launch the job.
direct recursive handling would merely decompose multi-job LAUNCH into
multiple LAUNCH DagActions and multi-job REEVALUATE into multiple REEVALUATE
DagActions. why not transparently handle the multi-job cases in this way,
rather than pressing REEVALUATE for multi-LAUNCH?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
return;
}
+ if (dagNodeWithJobStatus.getRight().isPresent()
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // if the job status is not present, this job was never launched, submit
it now
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
Review Comment:
it may be too subtle (and even potentially mask errors) to designate that a
REEVALUATE DagAction with no job status actually happens to encode the need to
launch the job.
direct recursive handling would merely decompose multi-job LAUNCH into
multiple LAUNCH DagActions and multi-job REEVALUATE into multiple REEVALUATE
DagActions. why not transparently handle the multi-job cases in this way,
rather than pressing REEVALUATE into service for multi-LAUNCH?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]