This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5d6eee1a [GOBBLIN-2132] set flow event field in dag and also emit 
GTE when flow is submitted (#4026)
ac5d6eee1a is described below

commit ac5d6eee1a71bb51fef2f27607a8492abc12cc61
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 15 08:23:13 2024 -0700

    [GOBBLIN-2132] set flow event field in dag and also emit GTE when flow is 
submitted (#4026)
    
    * set and emit flow status events
    * add logs
    * address review comments
---
 .../modules/orchestration/DagManagerUtils.java     |  2 +-
 .../modules/orchestration/TimingEventUtils.java    |  4 ++--
 .../modules/orchestration/proc/DagProcUtils.java   | 24 +++++++++++++++++++++-
 .../modules/orchestration/proc/KillDagProc.java    |  2 +-
 .../modules/orchestration/proc/LaunchDagProc.java  | 13 +++++++++++-
 .../orchestration/proc/ReevaluateDagProc.java      |  4 ++--
 .../modules/orchestration/proc/ResumeDagProc.java  |  2 +-
 .../service/monitoring/SpecStoreChangeMonitor.java |  6 +++---
 8 files changed, 45 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 7323543f61..45ad84f91a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -316,7 +316,7 @@ public class DagManagerUtils {
     return (int) (flowExecutionId % numThreads);
   }
 
-  static Config getDagJobConfig(Dag<JobExecutionPlan> dag) {
+  public static Config getDagJobConfig(Dag<JobExecutionPlan> dag) {
     // Every dag should have at least one node, and the job configurations are 
cloned among each node
     return dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index cddd9bb84f..746a3a41c0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -17,11 +17,11 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.util.Map;
+import java.util.Optional;
 
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
-import java.util.Optional;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -36,7 +36,7 @@ public class TimingEventUtils {
     return getFlowMetadata(flowSpec.getConfig());
   }
 
-  static Map<String, String> getFlowMetadata(Config flowConfig) {
+  public static Map<String, String> getFlowMetadata(Config flowConfig) {
     Map<String, String> metadata = Maps.newHashMap();
 
     metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 75836fb6ab..21285bb994 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -34,6 +34,7 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -171,8 +172,10 @@ public class DagProcUtils {
       // into inconsistent state.
       if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
         sendCancellationEvent(dagNodeToCancel, props);
+        log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(),
+            props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
       } else {
-        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getId());
       }
     } catch (Exception e) {
       throw new IOException(e);
@@ -199,6 +202,25 @@ public class DagProcUtils {
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
+  /**
+   * Sets {@link Dag#flowEvent} and emits a {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent} of the provided
+   * flow event type.
+   */
+  public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
+    if (!dag.isEmpty()) {
+      // Every dag node will contain the same flow metadata
+      Config config = DagManagerUtils.getDagJobConfig(dag);
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(config);
+      dag.setFlowEvent(flowEvent);
+
+      if (dag.getMessage() != null) {
+        flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
+      }
+
+      eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
+    }
+  }
+
   private static void 
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
       throws IOException {
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(dagNode.getValue().getFlowGroup(),
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index c2a90dcc3b..abb15ae3a5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -63,8 +63,8 @@ public class KillDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       return;
     }
 
-    dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
     dag.get().setMessage("Flow killed by request");
+    DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), 
TimingEvent.FlowTimings.FLOW_CANCELLED);
 
     if (this.shouldKillSpecificJob) {
       Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = 
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 0efef83139..58d8877a2a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -26,10 +26,13 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -37,7 +40,12 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is 
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless 
it fails and is
+ * retried by the retry-reminders. Post-launch, a subsequent {@link 
ReevaluateDagProc} runs after each job of the DAG
+ * completes. That then may launch further jobs or conclude execution of the 
overall DAG (flow).
  */
 @Slf4j
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
@@ -77,6 +85,9 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
     } else {
       DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), 
getDagId());
+      DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), 
TimingEvent.FlowTimings.FLOW_RUNNING);
+      
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagManagerUtils.getFlowId(dag.get()),
+          DagManager.FlowState.RUNNING);
       
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
       dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 1b3d10bff3..d55d5a425e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 /**
  * A {@link DagProc} to launch any subsequent (dependent) job(s) once all 
pre-requisite job(s) in the Dag have succeeded.
  * When there are no more jobs to run and no more running, it cleans up the 
Dag.
- * (In future), if there are multiple new jobs to be launched, separate launch 
dag actions are created for each of them.
+ * If there are multiple new jobs to be launched, separate launch dag actions 
are created for each of them.
  */
 @Slf4j
 public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
@@ -112,7 +112,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
       }
       String flowEvent = dag.getFlowEvent();
-      DagManagerUtils.emitFlowEvent(eventSubmitter, dag, flowEvent);
+      DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
       if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
         // todo - verify if work from PR#3641 is required
         dagManagementStateStore.deleteDag(getDagId());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 8326d83f09..bfe67db588 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -71,7 +71,7 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     long flowResumeTime = System.currentTimeMillis();
 
     // Set the flow and its failed or cancelled nodes to PENDING_RESUME so 
that the flow will be resumed from the point before it failed
-    DagManagerUtils.emitFlowEvent(eventSubmitter, failedDag.get(), 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+    DagProcUtils.setAndEmitFlowEvent(eventSubmitter, failedDag.get(), 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
 
     for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
       ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 03817cacab..e1a38c51ff 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -154,14 +154,13 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
       return;
     }
 
-    spec = (!operation.equals("DELETE")) ? 
this.flowCatalog.getSpecWrapper(specAsUri) : null;
-
     // The monitor should continue to process messages regardless of failures 
with individual messages, instead we use
     // metrics to keep track of failure to process certain SpecStoreChange 
events
     try {
       // Call respective action for the type of change received
       AddSpecResponse response;
       if (operation.equals("INSERT") || operation.equals("UPDATE")) {
+        spec = this.flowCatalog.getSpecWrapper(specAsUri);
         response = scheduler.onAddSpec(spec);
 
         // Null response means the dag failed to compile
@@ -186,7 +185,8 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
         return;
       }
     } catch (Exception e) {
-      log.warn("Ran into unexpected error processing SpecStore changes. 
Reexamine scheduler. Error: {}", e);
+      log.warn("Unexpected error processing specUri {} changes. Reexamine 
scheduler. "
+          + "tid: {} operation: {} delay: {}, Error: ", key, tid, operation, 
produceToConsumeDelayValue, e);
       this.unexpectedErrors.mark();
       return;
     }

Reply via email to