phet commented on code in PR #4049:
URL: https://github.com/apache/gobblin/pull/4049#discussion_r1750982277
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -183,12 +182,34 @@ public static void
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}
- public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ /**
+ * Emits JOB_SKIPPED GTE for each of the dependent job.
+ */
+ public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan>
dag, Dag.DagNode<JobExecutionPlan> node) {
+ Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
+ findDependentJobs(dag, node, dependentJobs);
+ for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
+ }
+ }
+
+ private static void findDependentJobs(Dag<JobExecutionPlan> dag,
+ Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>>
dependentJobs) {
Review Comment:
nit: rename `dependentJobs` to `result`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -202,7 +223,7 @@ private static void
sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of
the provided
* flow event type.
*/
- public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String
flowEvent) {
Review Comment:
I recall we discussed earlier about avoiding static members, even though we
eventually moved forward w/ `DagProc.eventSubmitter`.
nonetheless, let's avoid hard-coding a dependency on it from other modules.
if the caller of this method wants to pass in the `DagProc.eventSubmitter`,
that's different than this method forcing that to be the exact and only
`EventSubmitter`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -183,12 +182,34 @@ public static void
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}
- public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ /**
+ * Emits JOB_SKIPPED GTE for each of the dependent job.
Review Comment:
the dependent job-*s*
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore
dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
+ DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case CANCELLED:
+ case SKIPPED:
Review Comment:
if this is job-level `SKIPPED`, due to the "ping-pong" I just described?
or is arising from a flow-level execution-status of `SKIPPED`. if the
latter, who sets that? I thought it would be only job-level
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -183,12 +182,34 @@ public static void
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}
- public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ /**
+ * Emits JOB_SKIPPED GTE for each of the dependent job.
+ */
+ public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan>
dag, Dag.DagNode<JobExecutionPlan> node) {
+ Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
+ findDependentJobs(dag, node, dependentJobs);
+ for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
Review Comment:
same comment about hard-coding to this `static`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java:
##########
@@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>>
getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
- if (executionStatus == PENDING || executionStatus == PENDING_RETRY ||
executionStatus == PENDING_RESUME) {
+ if (executionStatus == PENDING || executionStatus == PENDING_RETRY ||
executionStatus == PENDING_RESUME ||
+ executionStatus == SKIPPED) {
Review Comment:
I'm unclear here: if "skipping" able to be reversed, so the node can later
be ready? (I'm equating `getNext` to identifying the set of "ready" nodes.)
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl:
##########
@@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED
+
+ /**
+ * Flow or job is skipped
Review Comment:
how would a flow be skipped? wouldn't the flow instead be CANCELLED? then
(fewer than all of) that flow's jobs may be SKIPPED
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore
dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
+ DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
Review Comment:
wondering... is this a kind of 'ping-pong'?
a. a job fails, which emits a GTE
b. the KJSM sees the GTE and then creates a `DagActionType.REEVALUATE`
c. this ReevaluateDagProc emits a SKIPPED GTE for all dependent jobs
d. the KJSM sees those GTEs and creates a `DagActionType.REEVALUATE` for
each of those
I'm wondering whether step d.) is necessary, given we setting SKIPPED should
be a bulk operation on ALL dependent jobs. does the KJSM really need to create
a `DagAction` for reevaluating those?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES =
ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
- ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
ExecutionStatus.COMPLETE,
+ ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
Review Comment:
why is CANCELLED last, and SKIPPED prior to COMPLETE? what of the similar
idea that news of job COMPLETE might arrive after we'd already attempted
cancellation or skipping?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]