This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 981ec4f310 [GOBBLIN-2079] Separate GaaSJobObservabilityEvents to both 
flow and job level events (#3962)
981ec4f310 is described below

commit 981ec4f31027e69962984f1f63fb41a04f08dab8
Author: William Lo <[email protected]>
AuthorDate: Mon Jul 29 20:28:33 2024 -0400

    [GOBBLIN-2079] Separate GaaSJobObservabilityEvents to both flow and job 
level events (#3962)
    
    * Adding flow level events to GaaSObservability to better differentiate vs 
job level events
---
 .../src/main/avro/GaaSFlowObservabilityEvent.avsc  | 100 +++++++++++++++
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 140 ++++++++++++++++++++-
 .../service/modules/orchestration/DagManager.java  |   2 +
 .../modules/orchestration/proc/DagProcUtils.java   |   2 +
 .../GaaSJobObservabilityEventProducer.java         |  33 ++++-
 .../NoopGaaSJobObservabilityEventProducer.java     |   8 +-
 .../GaaSJobObservabilityProducerTest.java          |  53 +++++++-
 .../MockGaaSJobObservabilityEventProducer.java     |  32 +++--
 8 files changed, 354 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
new file mode 100644
index 0000000000..36a755bfbd
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
@@ -0,0 +1,100 @@
+{
+  "type": "record",
+  "name": "GaaSFlowObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit after a flow is executed.",
+  "fields": [
+    {
+      "name": "eventTimestamp",
+      "type": "long",
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
+    },
+    {
+      "name": "flowGroup",
+      "type": "string",
+      "doc": "Flow group for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowName",
+      "type": "string",
+      "doc": "Flow name for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowExecutionId",
+      "type": "long",
+      "doc": "Flow execution id for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "lastFlowModificationTimestamp",
+      "type": "long",
+      "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
+    },
+    {
+      "name": "sourceNode",
+      "type": "string",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowStatus",
+      "type": {
+        "type": "enum",
+        "name": "FlowStatus",
+        "symbols": [
+          "SUCCEEDED",
+          "COMPILATION_FAILURE",
+          "SUBMISSION_FAILURE",
+          "EXECUTION_FAILURE",
+          "CANCELLED"
+        ],
+        "doc": "Final flow status for the GaaS flow",
+        "compliance": "NONE"
+      }
+    },
+    {
+      "name": "effectiveUserUrn",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job e.g. myGroup",
+      "compliance": "NONE"
+    },
+    {
+      "name": "gaasId",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null,
+      "doc": "The deployment ID of GaaS that is sending the event (if multiple 
GaaS instances are running)"
+    },
+    {
+      "name": "flowStartTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the flow - when the dag is initialized, in millis 
since Epoch. Null if the job was never run",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowEndTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Finish time of the job in millis since Epoch, null if the job 
was never run",
+      "compliance": "NONE"
+    }
+  ]
+}
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index e20cfd97fb..0017629487 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -59,6 +59,8 @@ import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
 import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.JobStatus;
@@ -653,7 +655,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
 
     // Only the COMPLETE event should create a GaaSJobObservabilityEvent
-    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
     Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
     GaaSJobObservabilityEvent event1 = iterator.next();
     Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
@@ -702,7 +704,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
 
     // Only the COMPLETE event should create a GaaSJobObservabilityEvent
-    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
     Assert.assertEquals(emittedEvents.size(), 1);
     Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
     GaaSJobObservabilityEvent event1 = iterator.next();
@@ -713,6 +715,126 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(producerState,
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+    List<GaaSFlowObservabilityEvent> emittedFlowEvents = 
mockEventProducer.getTestEmittedFlowEvents();
+    Assert.assertEquals(emittedFlowEvents.size(), 1);
+    Iterator<GaaSFlowObservabilityEvent> flowIterator = 
emittedFlowEvents.iterator();
+    GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+    Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.SUCCEEDED);
+    Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+    Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+    jobStatusMonitor.shutDown();
+  }
+
+  @Test (dependsOnMethods = "testObservabilityEventFlowLevel")
+  public void testObservabilityEventFlowFailed() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobFailedEvent(),
+        createFlowFailedEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(producerState,
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
+
+    // Only the FAILED event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.EXECUTION_FAILURE);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+    List<GaaSFlowObservabilityEvent> emittedFlowEvents = 
mockEventProducer.getTestEmittedFlowEvents();
+    Assert.assertEquals(emittedFlowEvents.size(), 1);
+    Iterator<GaaSFlowObservabilityEvent> flowIterator = 
emittedFlowEvents.iterator();
+    GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+    Assert.assertEquals(flowEvent.getFlowStatus(), 
FlowStatus.EXECUTION_FAILURE);
+    Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+    Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+    jobStatusMonitor.shutDown();
+  }
+
   private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor 
jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> 
recordIterator,
       String jobGroup, String jobName) throws IOException {
     jobStatusMonitor.processMessage(recordIterator.next());
@@ -761,7 +883,19 @@ public class KafkaAvroJobStatusMonitorTest {
   }
 
   private GobblinTrackingEvent createFlowSucceededEvent() {
-    return createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED, 
Maps.newHashMap());
+    GobblinTrackingEvent event = 
createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED, Maps.newHashMap());
+    // Remove jobname and jobgroup as flow level events should not have them, 
get populated with "NA"
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    return event;
+  }
+
+  private GobblinTrackingEvent createFlowFailedEvent() {
+    GobblinTrackingEvent event = 
createGTE(TimingEvent.FlowTimings.FLOW_FAILED, Maps.newHashMap());
+    // Remove jobname and jobgroup as flow level events should not have them, 
get populated with "NA"
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    return event;
   }
 
   private GobblinTrackingEvent createJobFailedEvent() {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ee80cbb65b..e0992278b9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -77,6 +77,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -998,6 +999,7 @@ public class DagManager extends AbstractIdleService {
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
         // Add serialized job properties as part of the orchestrated job event 
metadata
         jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
+        jobMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, 
String.valueOf(dagNode.getValue().getFlowStartTime()));
         jobOrchestrationTimer.stop(jobMetadata);
         log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
         this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 3e0f91c393..b63c15caea 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -46,6 +46,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -125,6 +126,7 @@ public class DagProcUtils {
       jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
       // Add serialized job properties as part of the orchestrated job event 
metadata
       jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
+      jobMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, 
String.valueOf(dagNode.getValue().getFlowStartTime()));
       jobOrchestrationTimer.stop(jobMetadata);
       log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index ab72a52f8a..4beafba29e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -38,6 +38,8 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.DatasetMetric;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
 import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.Issue;
 import org.apache.gobblin.metrics.IssueSeverity;
@@ -57,6 +59,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -69,6 +72,7 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSJobObservabilityEventProducer.";
   public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSJobObservabilityEventProducer.class.getName();
+  public static final String EMIT_FLOW_OBSERVABILITY_EVENT = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "emitFlowLevelEvent";
   public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
   public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
   public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME = 
"jobSucceeded";
@@ -81,12 +85,14 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   protected ObservableLongMeasurement jobStatusMetric;
   protected MultiContextIssueRepository issueRepository;
   protected boolean instrumentationEnabled;
+  protected boolean emitFlowObservabilityEvent;
   ContextAwareMeter getIssuesFailedMeter;
 
   public GaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     this.state = state;
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = instrumentationEnabled;
+    this.emitFlowObservabilityEvent = 
this.state.getPropAsBoolean(EMIT_FLOW_OBSERVABILITY_EVENT, false);
     if (this.instrumentationEnabled) {
       this.metricContext = Instrumented.getMetricContext(state, getClass());
       this.getIssuesFailedMeter = 
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -123,7 +129,11 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
 
   public void emitObservabilityEvent(final State jobState) {
     GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
-    sendUnderlyingEvent(event);
+    if 
(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD).equals(JobStatusRetriever.NA_KEY)
 && this.emitFlowObservabilityEvent) {
+      sendFlowLevelEvent(convertJobEventToFlowEvent(event, jobState));
+    } else {
+      sendJobLevelEvent(event);
+    }
     this.eventCollector.add(event);
   }
 
@@ -142,7 +152,9 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
    * Emits the GaaSJobObservabilityEvent with the mechanism that the child 
class is built upon e.g. Kafka
    * @param event
    */
-  abstract protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event);
+  abstract protected void sendJobLevelEvent(GaaSJobObservabilityEvent event);
+
+  abstract protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event);
 
   /**
    * Creates a GaaSJobObservabilityEvent which is derived from a final GaaS 
job pipeline state, which is combination of GTE job states in an ordered fashion
@@ -244,6 +256,23 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
         )).collect(Collectors.toList());
   }
 
+  private GaaSFlowObservabilityEvent 
convertJobEventToFlowEvent(GaaSJobObservabilityEvent jobEvent, State jobState) {
+    GaaSFlowObservabilityEvent.Builder builder = 
GaaSFlowObservabilityEvent.newBuilder();
+    builder.setEventTimestamp(jobEvent.getEventTimestamp())
+        .setGaasId(jobEvent.getGaasId())
+        .setFlowName(jobEvent.getFlowName())
+        .setFlowGroup(jobEvent.getFlowGroup())
+        .setFlowExecutionId(jobEvent.getFlowExecutionId())
+        
.setLastFlowModificationTimestamp(jobEvent.getLastFlowModificationTimestamp())
+        .setSourceNode(jobEvent.getSourceNode())
+        .setDestinationNode(jobEvent.getDestinationNode())
+        .setEffectiveUserUrn(jobEvent.getEffectiveUserUrn())
+        .setFlowStatus(FlowStatus.valueOf(jobEvent.getJobStatus().toString()))
+        
.setFlowStartTimestamp(jobState.getPropAsLong(SerializationConstants.FLOW_START_TIME_KEY,
 0))
+        .setFlowEndTimestamp(jobEvent.getJobEndTimestamp());
+    return builder.build();
+  }
+
   @Override
   public void close() throws IOException {
     // producer close will handle by the cache
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
index a0eeb88e17..de0db5a01d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.monitoring;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
 import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
@@ -33,12 +34,15 @@ public class NoopGaaSJobObservabilityEventProducer extends 
GaaSJobObservabilityE
   }
 
   public NoopGaaSJobObservabilityEventProducer() {
-    super(null, null, false);
+    super(new State(), null, false);
   }
 
   @Override
   public void emitObservabilityEvent(State jobState) {}
 
   @Override
-  protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {}
+  protected void sendJobLevelEvent(GaaSJobObservabilityEvent event) {}
+
+  @Override
+  protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event) {}
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 2aaf7ac261..2cbde03039 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -38,6 +38,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
 import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.JobStatus;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -55,6 +57,7 @@ import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -107,7 +110,7 @@ public class GaaSJobObservabilityProducerTest {
     jobStatusProps.putAll(gteEventMetadata);
     producer.emitObservabilityEvent(new State(jobStatusProps));
 
-    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
 
     Assert.assertEquals(emittedEvents.size(), 1);
     Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
@@ -171,7 +174,7 @@ public class GaaSJobObservabilityProducerTest {
     jobStatusProps.putAll(gteEventMetadata);
     producer.emitObservabilityEvent(new State(jobStatusProps));
 
-    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
 
     Assert.assertEquals(emittedEvents.size(), 1);
     Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
@@ -195,6 +198,52 @@ public class GaaSJobObservabilityProducerTest {
     serializer.serializeRecord(event);
   }
 
+  @Test
+  public void testCreateGaaSObservabilityFlowEvent() throws Exception {
+    String flowGroup = "testFlowGroup3";
+    String flowName = "testFlowName3";
+    String jobName = JobStatusRetriever.NA_KEY;
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, false);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+    gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSFlowObservabilityEvent> emittedEvents = 
producer.getTestEmittedFlowEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSFlowObservabilityEvent event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
+    Assert.assertNull(event.getEffectiveUserUrn());
+    Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
+
+    AvroSerializer<GaaSFlowObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+        GaaSFlowObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
+    );
+    serializer.serializeRecord(event);
+  }
+
   @Test
   public void testEnableMetrics() throws Exception {
     String flowGroup = "testFlowGroup2";
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
index 528edc2735..5d251fce08 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
 import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
 import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
@@ -33,7 +34,9 @@ import 
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
  * Tests can use a getter to fetch a read-only version of the events that were 
emitted
  */
 public class MockGaaSJobObservabilityEventProducer extends 
GaaSJobObservabilityEventProducer {
-  private List<GaaSJobObservabilityEvent> emittedEvents = new ArrayList<>();
+  private List<GaaSJobObservabilityEvent> emittedJobEvents = new ArrayList<>();
+
+  private List<GaaSFlowObservabilityEvent> emittedFlowEvents = new 
ArrayList<>();
 
   public MockGaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     super(state, issueRepository, instrumentationEnabled);
@@ -44,19 +47,34 @@ public class MockGaaSJobObservabilityEventProducer extends 
GaaSJobObservabilityE
     return InMemoryOpenTelemetryMetrics.getInstance(state);
   }
   @Override
-  protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {
-    emittedEvents.add(event);
+  protected void sendJobLevelEvent(GaaSJobObservabilityEvent event) {
+    emittedJobEvents.add(event);
+  }
+
+  @Override
+  protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event) {
+    emittedFlowEvents.add(event);
+  }
+
+  /**
+   * Returns the job level events that the mock producer has written
+   * This should only be used as a read-only object for emitted 
GaaSJobObservabilityEvents
+   * @return list of events that would have been emitted
+   */
+  public List<GaaSJobObservabilityEvent> getTestEmittedJobEvents() {
+    return Collections.unmodifiableList(this.emittedJobEvents);
   }
 
   /**
-   * Returns the events that the mock producer has written
-   * This should only be used as a read-only object for emitted 
GaaSObservabilityEvents
+   * Returns the flow level events that the mock producer has written
+   * This should only be used as a read-only object for emitted 
GaaSFlowObservabilityEvents
    * @return list of events that would have been emitted
    */
-  public List<GaaSJobObservabilityEvent> getTestEmittedEvents() {
-    return Collections.unmodifiableList(this.emittedEvents);
+  public List<GaaSFlowObservabilityEvent> getTestEmittedFlowEvents() {
+    return Collections.unmodifiableList(this.emittedFlowEvents);
   }
 
+
   public InMemoryOpenTelemetryMetrics getOpentelemetryMetrics() {
     return (InMemoryOpenTelemetryMetrics) this.opentelemetryMetrics;
   }

Reply via email to