[
https://issues.apache.org/jira/browse/GOBBLIN-2132?focusedWorklogId=930321&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-930321
]
ASF GitHub Bot logged work on GOBBLIN-2132:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Aug/24 05:42
Start Date: 15/Aug/24 05:42
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4026:
URL: https://github.com/apache/gobblin/pull/4026#discussion_r1717966511
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -186,7 +185,8 @@ protected void processMessage(DecodeableKafkaRecord
message) {
return;
}
} catch (Exception e) {
- log.warn("Ran into unexpected error processing SpecStore changes.
Reexamine scheduler. Error: {}", e);
+ log.warn("Ran into unexpected error processing specUri {} changes.
Reexamine scheduler. "
Review Comment:
makes sense...
nit: no need for "Ran into "
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -171,6 +172,8 @@ public static void
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
// into inconsistent state.
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
sendCancellationEvent(dagNodeToCancel, props);
+ log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(),
+ props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
} else {
log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getJobSpec().getUri());
Review Comment:
NBD, but on success we log `.getId()` and on failure
`.getJobSpec().getUri()` - could it be consistent?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -26,18 +26,26 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless
it fails and is
+ * retried by the retry-reminders. {@link ReevaluateDagProc} runs multiple
times depending upon the number of jobs and
+ * number of parallel jobs.
Review Comment:
suggest for final sentence:
> Post-launch, a subsequent {@link ReevaluateDagProc} runs after each job of
the DAG completes. This may itself launch further jobs or conclude execution
of the overall DAG/flow.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
+ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ if (!dag.isEmpty()) {
+ // Every dag node will contain the same flow metadata
+ Config config = DagManagerUtils.getDagJobConfig(dag);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(config);
+ dag.setFlowEvent(flowEvent);
Review Comment:
do we set this, but don't actually persist it in the store?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
+ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
Review Comment:
needs javadoc
Issue Time Tracking
-------------------
Worklog Id: (was: 930321)
Time Spent: 1h 10m (was: 1h)
> set and emit flow running status
> --------------------------------
>
> Key: GOBBLIN-2132
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2132
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)