[ 
https://issues.apache.org/jira/browse/GOBBLIN-2132?focusedWorklogId=930321&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-930321
 ]

ASF GitHub Bot logged work on GOBBLIN-2132:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Aug/24 05:42
            Start Date: 15/Aug/24 05:42
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4026:
URL: https://github.com/apache/gobblin/pull/4026#discussion_r1717966511


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -186,7 +185,8 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
         return;
       }
     } catch (Exception e) {
-      log.warn("Ran into unexpected error processing SpecStore changes. 
Reexamine scheduler. Error: {}", e);
+      log.warn("Ran into unexpected error processing specUri {} changes. 
Reexamine scheduler. "

Review Comment:
   makes sense...
   
   nit: no need for "Ran into "



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -171,6 +172,8 @@ public static void 
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
       // into inconsistent state.
       if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
         sendCancellationEvent(dagNodeToCancel, props);
+        log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(),
+            props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
       } else {
         log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());

Review Comment:
   NBD, but on success we log `.getId()` and on failure 
`.getJobSpec().getUri()` - could it be consistent?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -26,18 +26,26 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is 
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless 
it fails and is
+ * retried by the retry-reminders. {@link ReevaluateDagProc} runs multiple 
times depending upon the number of jobs and
+ * number of parallel jobs.

Review Comment:
   suggest for final sentence:
   > Post-launch, a subsequent {@link ReevaluateDagProc} runs after each job of 
the DAG completes.  This may itself launch further jobs or conclude execution 
of the overall DAG/flow.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void 
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
+  public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
+    if (!dag.isEmpty()) {
+      // Every dag node will contain the same flow metadata
+      Config config = DagManagerUtils.getDagJobConfig(dag);
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(config);
+      dag.setFlowEvent(flowEvent);

Review Comment:
   do we set this, but don't actually persist it in the store?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void 
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
+  public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {

Review Comment:
   needs javadoc





Issue Time Tracking
-------------------

    Worklog Id:     (was: 930321)
    Time Spent: 1h 10m  (was: 1h)

> set and emit flow running status
> --------------------------------
>
>                 Key: GOBBLIN-2132
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2132
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to