This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8702d9691 [GOBBLIN-1764] Emit observability event (#3623)
8702d9691 is described below

commit 8702d96914a3dd33551c801b80fe119bb5f7b672
Author: William Lo <[email protected]>
AuthorDate: Thu Jan 19 12:09:49 2023 -0800

    [GOBBLIN-1764] Emit observability event (#3623)
    
    * Add schemas for observability events in GaaS
    
    * Address reviews
    
    * Address reviews
    
    * Clean up schema, address review for GaaSObservabilityEvent
    
    * Clarify default types, add failure types
    
    * Fix schema type array
    
    * Modify proxy user field to be user identification
    
    * Address final review
    
    * Rename userIdentity to executionUserUrn
    
    * WIP
    
    * WIP
    
    * WIP
    
    * Create and populate GaaSObservabilityEvent
    
    * Add tests
    
    * cleanup
    
    * Fix bug where state was not fetched properly
    
    * Add additional tests
    
    * Cleanup
    
    * Address review and clean up
    
    * Fix checkstyle
    
    * Fix bug in monitor factory
    
    * Fix checkstyles
    
    * Add javadoc for mock classes
    
    * Address reviews
    
    * Remove unused imports
    
    * fix test compilation for mock producer
    
    * Remove stray newline
---
 .../avro/GaaSObservabilityEventExperimental.avsc   |  16 +-
 gobblin-modules/gobblin-kafka-09/build.gradle      |   1 +
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 186 ++++++++++++++++-----
 .../monitoring/GaaSObservabilityEventProducer.java | 161 ++++++++++++++++++
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   4 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  25 ++-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  16 +-
 .../NoopGaaSObservabilityEventProducer.java        |  44 +++++
 .../monitoring/GaaSObservabilityProducerTest.java  |  90 ++++++++++
 .../service/monitoring/JobStatusRetrieverTest.java |   3 +-
 .../MockGaaSObservabilityEventProducer.java        |  53 ++++++
 .../monitoring/MysqlJobStatusRetrieverTest.java    |   4 +-
 12 files changed, 547 insertions(+), 56 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 47c9be6c9..a9039dd90 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -53,7 +53,7 @@
           "SUCCEEDED",
           "COMPILATION_FAILURE",
           "SUBMISSION_FAILURE",
-          "EXEUCTION_FAILURE",
+          "EXECUTION_FAILURE",
           "CANCELLED"
         ],
         "doc": "Final job status for this job in the GaaS flow",
@@ -70,14 +70,20 @@
     },
     {
       "name": "jobStartTime",
-      "type": "long",
-      "doc": "Start time of the job in millis since Epoch",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the job in millis since Epoch, null if the job was 
never run",
       "compliance": "NONE"
     },
     {
       "name": "jobEndTime",
-      "type": "long",
-      "doc": "Finish time of the job in millis since Epoch",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Finish time of the job in millis since Epoch, null if the job 
was never run",
       "compliance": "NONE"
     },
     {
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle 
b/gobblin-modules/gobblin-kafka-09/build.gradle
index e13f4deb2..9dd1604d9 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -50,6 +50,7 @@ dependencies {
   runtimeOnly externalDependency.protobuf
 
   testCompile project(":gobblin-service")
+  testCompile project(path: ":gobblin-service", configuration: "tests")
   testCompile project(":gobblin-modules:gobblin-service-kafka")
   testCompile project(path: ":gobblin-runtime", configuration: "tests")
   testCompile project(path: ":gobblin-metastore", configuration: 
"testFixtures")
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index fc5b87802..6f1069e15 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -58,18 +58,26 @@ import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.JobStatus;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
 import org.apache.gobblin.metrics.kafka.Pusher;
+import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import 
org.apache.gobblin.service.monitoring.MockGaaSObservabilityEventProducer;
+import 
org.apache.gobblin.service.monitoring.NoopGaaSObservabilityEventProducer;
+import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
 import static org.mockito.Mockito.mock;
@@ -131,7 +139,8 @@ public class KafkaAvroJobStatusMonitorTest {
     } catch(InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+      new NoopGaaSObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -189,7 +198,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -266,7 +275,8 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        new NoopGaaSObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -325,7 +335,8 @@ public class KafkaAvroJobStatusMonitorTest {
     AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new 
AtomicBoolean(false);
     Config conf = ConfigFactory.empty().withValue(
         KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + 
RETRY_MULTIPLIER, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
 conf);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
 conf,
+        new NoopGaaSObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -358,6 +369,66 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1, 4),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(2, 4),
+        createJobStartSLAKilledEvent(),
+        // Verify that kill event will not retry
+        createJobOrchestratedEvent(3, 4),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    // Received kill flow event, should not retry the flow even though there 
is 1 pending attempt left
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
+
+    jobStatusMonitor.shutDown();
+  }
+
   @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
   public void testProcessProgressingMessageWhenNoPreviousStatus() throws 
IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic5");
@@ -376,7 +447,8 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        new NoopGaaSObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -387,62 +459,95 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
-  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
-  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+  @Test (dependsOnMethods = 
"testProcessProgressingMessageWhenNoPreviousStatus")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic6");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1, 4),
-        createJobSLAKilledEvent(),
-        createJobOrchestratedEvent(2, 4),
-        createJobStartSLAKilledEvent(),
-        // Verify that kill event will not retry
-        createJobOrchestratedEvent(3, 4),
-        createJobCancelledEvent()
+        createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
-
     try {
       Thread.sleep(1000);
-    } catch(InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
-      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
-      this::convertMessageAndMetadataToDecodableKafkaRecord);
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
-    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.shutDown();
+  }
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+  @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+  public void testObservabilityEventSingleEmission() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobCancelledEvent(),
+        createJobSucceededEvent() // This event should be ignored
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    // Received kill flow event, should not retry the flow even though there 
is 1 pending attempt left
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
-    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
+
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.CANCELLED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
     jobStatusMonitor.shutDown();
   }
@@ -494,6 +599,10 @@ public class KafkaAvroJobStatusMonitorTest {
     return createGTE(TimingEvent.LauncherTimings.JOB_SUCCEEDED, 
Maps.newHashMap());
   }
 
+  private GobblinTrackingEvent createFlowSucceededEvent() {
+    return createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED, 
Maps.newHashMap());
+  }
+
   private GobblinTrackingEvent createJobFailedEvent() {
     return createGTE(TimingEvent.LauncherTimings.JOB_FAILED, 
Maps.newHashMap());
   }
@@ -527,13 +636,14 @@ public class KafkaAvroJobStatusMonitorTest {
     return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
   }
 
-  MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws 
IOException, ReflectiveOperationException {
+  MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+      GaaSObservabilityEventProducer eventProducer) throws IOException, 
ReflectiveOperationException {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"))
         .withFallback(additionalConfig);
-    return new MockKafkaAvroJobStatusMonitor("test",config, 1, 
shouldThrowFakeExceptionInParseJobStatusToggle);
+    return new MockKafkaAvroJobStatusMonitor("test",config, 1, 
shouldThrowFakeExceptionInParseJobStatusToggle, eventProducer);
   }
   /**
    *   Create a dummy event to test if it is filtered out by the consumer.
@@ -589,9 +699,9 @@ public class KafkaAvroJobStatusMonitorTest {
      * @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and 
retain) to dial whether `parseJobStatus` throws
      */
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
-        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle)
+        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSObservabilityEventProducer producer)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads, mock(JobIssueEventHandler.class));
+      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..0816613a9
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits 
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support 
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = 
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(final State jobState) {
+    GaaSObservabilityEventExperimental event = 
createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class 
is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void 
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job 
pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental 
createGaaSObservabilityEvent(final State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = 
GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = getIssuesForJob(issueRepository, jobState);
+    } catch (Exception e) {
+      // If issues cannot be fetched, increment metric but continue to try to 
emit the event
+      log.error("Could not fetch issues while creating GaaSObservabilityEvent 
due to ", e);
+      if (this.instrumentationEnabled) {
+        this.getIssuesFailedMeter.mark();
+      }
+    }
+    JobStatus status = convertExecutionStatusTojobState(jobState, 
ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+    builder.setTimestamp(System.currentTimeMillis())
+        
.setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+        
.setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        
.setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        
.setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+        .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setJobStartTime(jobStartTime)
+        .setJobEndTime(jobEndTime)
+        .setIssues(issueList)
+        .setJobStatus(status)
+        // TODO: Populate the below fields in a separate PR
+        .setExecutionUserUrn(null)
+        .setExecutorId("")
+        .setLastFlowModificationTime(0)
+        .setFlowGraphEdgeId("")
+        .setJobOrchestratedTime(null); // TODO: Investigate why 
TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+    return builder.build();
+  }
+
+  private static JobStatus convertExecutionStatusTojobState(State state, 
ExecutionStatus executionStatus) {
+    switch (executionStatus) {
+      case FAILED:
+        // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION 
FAILURE, investigate events to populate these fields
+        if (state.contains(TimingEvent.JOB_END_TIME)) {
+          return JobStatus.EXECUTION_FAILURE;
+        }
+        return JobStatus.SUBMISSION_FAILURE;
+      case COMPLETE:
+        return JobStatus.SUCCEEDED;
+      case CANCELLED:
+        // TODO: If cancelled due to start SLA exceeded, consider grouping 
this as a submission failure?
+        return JobStatus.CANCELLED;
+      default:
+        return null;
+    }
+  }
+
+  private static List<Issue> getIssuesForJob(MultiContextIssueRepository 
issueRepository, State jobState) throws TroubleshooterException {
+    return 
issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+        issue -> new Issue(
+            issue.getTime().toEpochSecond(),
+            IssueSeverity.valueOf(issue.getSeverity().toString()),
+            issue.getCode(),
+            issue.getSummary(),
+            issue.getDetails(),
+            issue.getProperties()
+        )).collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() throws IOException {
+    // producer close will handle by the cache
+    if (this.instrumentationEnabled) {
+      this.metricContext.close();
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 1be36f4c8..20328c36d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -65,9 +65,9 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
-      JobIssueEventHandler jobIssueEventHandler)
+      JobIssueEventHandler jobIssueEventHandler, 
GaaSObservabilityEventProducer observabilityEventProducer)
       throws IOException, ReflectiveOperationException {
-    super(topic, config, numThreads,  jobIssueEventHandler);
+    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer);
 
     if (ConfigUtils.getBoolean(config, 
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new 
KafkaAvroSchemaRegistryFactory().
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 1a19e755e..7d1654af3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -107,10 +107,12 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
 
   private final JobIssueEventHandler jobIssueEventHandler;
-
   private final Retryer<Void> persistJobStatusRetryer;
+  private final GaaSObservabilityEventProducer eventProducer;
+
 
-  public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler)
+  public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
+      GaaSObservabilityEventProducer observabilityEventProducer)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, 
FileContextBasedFsStateStoreFactory.class.getName());
@@ -136,6 +138,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
             }
           }
         }));
+        this.eventProducer = observabilityEventProducer;
   }
 
   @Override
@@ -187,14 +190,14 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         org.apache.gobblin.configuration.State jobStatus = 
parseJobStatus(gobblinTrackingEvent);
         if (jobStatus != null) {
           try (Timer.Context context = 
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-            addJobStatusToStateStore(jobStatus, this.stateStore);
+            addJobStatusToStateStore(jobStatus, this.stateStore, 
this.eventProducer);
           }
         }
         return null;
       });
     } catch (ExecutionException ee) {
       String msg = String.format("Failed to add job status to state store for 
kafka offset %d", message.getOffset());
-      log.warn(msg, ee.getCause());
+      log.warn(msg, ee);
       // Throw RuntimeException to avoid advancing kafka offsets without 
updating state store
       throw new RuntimeException(msg, ee.getCause());
     } catch (RetryException re) {
@@ -219,7 +222,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore,
+      GaaSObservabilityEventProducer eventProducer)
       throws IOException {
     try {
       if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
@@ -266,6 +270,9 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
       modifyStateIfRetryRequired(jobStatus);
       stateStore.put(storeName, tableName, jobStatus);
+      if (isNewStateTransitionToFinal(jobStatus, states)) {
+        eventProducer.emitObservabilityEvent(jobStatus);
+      }
     } catch (Exception e) {
       log.warn("Meet exception when adding jobStatus to state store at "
           + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);
@@ -288,6 +295,14 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  static boolean 
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State 
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    if (prevStates.size() == 0) {
+      return 
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && 
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+  }
+
   /**
    * Merge states based on precedence defined by {@link 
#ORDERED_EXECUTION_STATUSES}.
    * The state instance in the 1st argument reflects the more recent state of 
a job
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 7f6fc51ad..d4129f1ea 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -45,11 +46,16 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
 
   private final Config config;
   private final JobIssueEventHandler jobIssueEventHandler;
+  private final MultiContextIssueRepository issueRepository;
+  private final boolean instrumentationEnabled;
 
   @Inject
-  public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler) {
+  public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
+      boolean instrumentationEnabled) {
     this.config = Objects.requireNonNull(config);
     this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
   }
 
   private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -76,8 +82,14 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
           
config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
     }
     jobStatusConfig = 
jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
+    Class observabilityEventProducerClassName = 
Class.forName(ConfigUtils.getString(config,
+        
GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
+        
GaaSObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
+    GaaSObservabilityEventProducer observabilityEventProducer = 
(GaaSObservabilityEventProducer) 
GobblinConstructorUtils.invokeLongestConstructor(
+        observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
+
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
-        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler);
+        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..f8b23bd62
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+/**
+ * The default producer for emitting GaaS Observability Events in the 
KafkaJobStatusMonitor
+ * This class does no work and will not create or emit any events
+ */
+public class NoopGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {
+
+  public NoopGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    super(state, issueRepository, instrumentationEnabled);
+  }
+
+  public NoopGaaSObservabilityEventProducer() {
+    super(null, null, false);
+  }
+
+  @Override
+  public void emitObservabilityEvent(State jobState) {}
+
+  @Override
+  protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) 
{}
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
new file mode 100644
index 000000000..18371e556
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
producer.getTestEmittedEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity 
severity) {
+    return 
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index dc17d0466..00b2598c8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -96,8 +96,7 @@ public abstract class JobStatusRetrieverTest {
       properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME, 
String.valueOf(endTime));
     }
     State jobStatus = new State(properties);
-
-    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
   }
 
   static Properties createAttemptsProperties(int currGen, int currAttempts, 
boolean shouldRetry) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..770276677
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+/**
+ * An extension of GaaSObservabilityEventProducer which creates the events and 
stores them in a list
+ * Tests can use a getter to fetch a read-only version of the events that were 
emitted
+ */
+public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {
+  private List<GaaSObservabilityEventExperimental> emittedEvents = new 
ArrayList<>();
+
+  public MockGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository) {
+    super(state, issueRepository, false);
+  }
+
+  @Override
+  protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) 
{
+    emittedEvents.add(event);
+  }
+
+  /**
+   * Returns the events that the mock producer has written
+   * This should only be used as a read-only object for emitted 
GaaSObservabilityEvents
+   * @return list of events that would have been emitted
+   */
+  public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
+    return Collections.unmodifiableList(this.emittedEvents);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index d226c8322..f26ebf59e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -136,7 +136,7 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
     State jobStatus = new State(properties);
 
-    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
     Iterator<JobStatus>
         jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
@@ -158,7 +158,7 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     State jobStatus = new State(properties);
 
     try {
-      KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore());
+      KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
     } catch (IOException e) {
       Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data 
too long"));
       return;

Reply via email to