phet commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631510375
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<JobStatus>> ini
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
- if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
+ if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
// already "reevaluated" and cleaned up.
return ImmutablePair.of(Optional.empty(), Optional.empty());
}
- ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
- if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
- log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
- // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
- throw new RuntimeException(String.format("Job status %s is not final for
job %s", executionStatus, getDagId()));
+ if (dagNodeWithJobStatus.getRight().isPresent()) {
+ ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
+ if
(FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+ setStatus(dagManagementStateStore,
dagNodeWithJobStatus.getLeft().get(), executionStatus);
Review Comment:
seems better to perform this in `act`, so it's not spread out between two
methods
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -66,7 +66,7 @@
* the flow action using {@link DagActionReminderScheduler} to reattempt the
lease after the current leaseholder's grant
* would have expired. The {@link DagActionReminderScheduler} is used in the
non multi-active execution configuration as
* well to utilize reminders for a single {@link DagManagementTaskStreamImpl}
case as well.
- * Note that if multi-active execution is NOT enabled, then all flow action
events are selected by
+ * Note that if multi-active execution is NOT enabsled, then all flow action
events are selected by
Review Comment:
misspelling
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<JobStatus>> ini
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
- if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
+ if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
// already "reevaluated" and cleaned up.
return ImmutablePair.of(Optional.empty(), Optional.empty());
}
- ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
- if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
- log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
- // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
- throw new RuntimeException(String.format("Job status %s is not final for
job %s", executionStatus, getDagId()));
+ if (dagNodeWithJobStatus.getRight().isPresent()) {
Review Comment:
somewhere let's add a comment describing circumstance of a dag node w/o job
status. where should that live?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +207,84 @@ public void testNoNextJobToRun() throws Exception {
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
+
+ @Test
+ public void testCurrentJobToRun() throws Exception {
+ String flowName = "fn3";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+ List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+ try {
+ return DagManagerUtils.getSpecProducer(n);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)),
Optional.empty()))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null,
+ dagManagementStateStore));
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ // only the current job should have run
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+ Mockito.verify(specProducers.get(1), Mockito.never()).addSpec(any());
Review Comment:
being a list, shouldn't you verify that *all* elems after `0` are not called?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -94,29 +93,50 @@ public void launchDag()
flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
- int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
- Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+ int numOfLaunchedJobs = 1; // = number of start nodes
+ Assert.assertEquals(numOfLaunchedJobs,
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
- Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addFlowDagAction(any(), any(), any(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
+ @Test
+ public void launchDagWithMultipleParallelJobs() throws IOException,
InterruptedException, URISyntaxException {
Review Comment:
I see validation of spec executors in `ReevaluateDagProcTest`, but not
here... shouldn't this also?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
return;
}
+ if (dagNodeWithJobStatus.getRight().isPresent()
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // if the job status is not present, this job was never launched, submit
it now
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
Review Comment:
it may be too subtle (and even potentially mask errors) to designate that a
REEVALUATE DagAction with no job status actually means to launch the job.
directly recursive handling would decompose multi-job LAUNCH into multiple
LAUNCH DagActions and multi-job REEVALUATE into multiple REEVALUATE DagActions.
why not transparently handle the multi-job cases in this way?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +207,84 @@ public void testNoNextJobToRun() throws Exception {
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
+
+ @Test
+ public void testCurrentJobToRun() throws Exception {
+ String flowName = "fn3";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+ List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+ try {
+ return DagManagerUtils.getSpecProducer(n);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
Review Comment:
nit: abstract this -
```
List<SpecProducer<Spec>> getDagSpecProducers(Dag<JEP> dag)
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,36 @@
*/
@Slf4j
public class DagProcUtils {
+
+ /**
+ * If there is a single job to run next, it runs it. If there are multiple
jobs to run, it creates a
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
dag action for
+ * each of them and those jobs will be launched in respective Reevaluate dag
proc.
+ */
+ public static void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagManager.DagId dagId) throws IOException {
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+
+ if (nextNodes.size() > 1) {
+ handleMultipleJobs(dagManagementStateStore, nextNodes);
+ }
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
dagId);
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), dagId);
+ }
Review Comment:
I'm confused... does the multi-job case both add a DagAction and also submit
each job to an executor? I didn't expect submitting to the executor until the
(deferred) DagAction is handled later.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -134,6 +154,7 @@ private void setStatus(DagManagementStateStore
dagManagementStateStore,
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
if (node.getValue().getId().equals(dagNodeId)) {
node.getValue().setExecutionStatus(executionStatus);
+ dagManagementStateStore.addDagNodeState(node, getDagId());
Review Comment:
was this merely omitted previously or is there only now a need for it, due
to asking `ReevaluateDagProc` to do double-duty of launching remaining
multi-jobs in addition to regular reevaluation of completed jobs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]