This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a11e72ac Graduate GaaSObservabilityEventExperimental (#3940)
7a11e72ac is described below

commit 7a11e72ac4a25325f216b0badc693e8a1677282d
Author: William Lo <[email protected]>
AuthorDate: Tue May 7 14:20:43 2024 -0400

    Graduate GaaSObservabilityEventExperimental (#3940)
    
    Update schema and rename event
---
 ...imental.avsc => GaaSJobObservabilityEvent.avsc} | 62 ++++++++++-----
 .../avro/GaaSObservabilityEventExperimental.avsc   |  4 +-
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 50 ++++++------
 .../modules/flowgraph/BaseFlowGraphHelper.java     |  2 +-
 .../service/modules/orchestration/DagManager.java  |  3 +-
 ...java => GaaSJobObservabilityEventProducer.java} | 91 ++++++++++++++--------
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  4 +-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  6 +-
 ... => NoopGaaSJobObservabilityEventProducer.java} | 10 +--
 ....java => GaaSJobObservabilityProducerTest.java} | 87 ++++++++++++---------
 ... => MockGaaSJobObservabilityEventProducer.java} | 14 ++--
 12 files changed, 196 insertions(+), 139 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
similarity index 78%
copy from 
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
copy to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
index 887d322f7..1f3bf0a09 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
@@ -1,13 +1,13 @@
 {
   "type": "record",
-  "name": "GaaSObservabilityEventExperimental",
+  "name": "GaaSJobObservabilityEvent",
   "namespace": "org.apache.gobblin.metrics",
-  "doc": "An experimental format for GaaS to emit events during and after a 
job is executed.",
+  "doc": "An event schema for GaaS to emit during and after a job is 
executed.",
   "fields": [
     {
-      "name": "timestamp",
+      "name": "eventTimestamp",
       "type": "long",
-      "doc": "Time at which event was created in millis"
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
     },
     {
       "name": "flowGroup",
@@ -28,14 +28,26 @@
       "compliance": "NONE"
     },
     {
-      "name": "lastFlowModificationTime",
+      "name": "lastFlowModificationTimestamp",
       "type": "long",
       "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
     },
     {
-      "name": "flowGraphEdgeId",
+      "name": "sourceNode",
       "type": "string",
-      "doc": "Flow edge id, in format <src_node>_<dest_node>_<edge_id>",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowEdgeId",
+      "type": "string",
+      "doc": "Flow edge id, excluding the sourceNode and destinationNode",
       "compliance": "NONE"
     },
     {
@@ -61,7 +73,7 @@
       }
     },
     {
-      "name": "jobOrchestratedTime",
+      "name": "jobOrchestratedTimestamp",
       "type": [
         "null",
         "long"
@@ -69,7 +81,7 @@
       "doc": "Timestamp when the job was successfully sent to the job 
executor, null if it was unable to be sent."
     },
     {
-      "name": "jobStartTime",
+      "name": "jobStartTimestamp",
       "type": [
         "null",
         "long"
@@ -78,7 +90,7 @@
       "compliance": "NONE"
     },
     {
-      "name": "jobEndTime",
+      "name": "jobEndTimestamp",
       "type": [
         "null",
         "long"
@@ -87,7 +99,7 @@
       "compliance": "NONE"
     },
     {
-      "name": "jobPlanningPhaseStartTime",
+      "name": "jobPlanningStartTimestamp",
       "type": [
         "null",
         "long"
@@ -97,7 +109,7 @@
       "default": null
     },
     {
-      "name": "jobPlanningPhaseEndTime",
+      "name": "jobPlanningEndTimestamp",
       "type": [
         "null",
         "long"
@@ -107,12 +119,12 @@
       "default": null
     },
     {
-      "name": "executionUserUrn",
+      "name": "effectiveUserUrn",
       "type": [
         "null",
         "string"
       ],
-      "doc": "User URN (if applicable) that runs the underlying Gobblin job",
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job e.g. myGroup",
       "compliance": "NONE"
     },
     {
@@ -127,7 +139,16 @@
     {
       "name": "executorId",
       "type": "string",
-      "doc": "The ID of the spec executor that ran or would have ran the job",
+      "doc": "The ID of the spec executor that ran or would have run the job",
+      "compliance": "NONE"
+    },
+    {
+      "name": "executorUrn",
+      "type":  [
+        "null",
+        "string"
+      ],
+      "doc": "Unique URN of the execution across platforms that describes the 
relevant ids for each job on each platform e.g. 
urn:apache:gobblin:exec:azkaban:92937153:temporal:NestingExecWorkUnits_1714593600113",
       "compliance": "NONE"
     },
     {
@@ -137,7 +158,7 @@
         "string"
       ],
       "default": null,
-      "doc": "The instance of GaaS that is sending the event (if multiple GaaS 
instances are running)"
+      "doc": "The deployment ID of GaaS that is sending the event (if multiple 
GaaS instances are running)"
     },
     {
       "name":"jobProperties",
@@ -162,7 +183,7 @@
               {
                 "name": "timestamp",
                 "type": "long",
-                "doc": "Time when the issue occurred"
+                "doc": "Milliseconds from UNIX epoch when the issue occurred"
               },
               {
                 "name": "severity",
@@ -208,7 +229,7 @@
       ]
     },
     {
-      "name": "datasetsWritten",
+      "name": "datasetsMetrics",
       "type": [
         "null",
         {
@@ -221,7 +242,7 @@
               {
                 "name": "datasetUrn",
                 "type": "string",
-                "doc": "URN of the dataset"
+                "doc": "URN of the dataset, empty string for unnamed datasets"
               },
               {
                 "name": "bytesWritten",
@@ -236,7 +257,7 @@
               {
                 "name": "successfullyCommitted",
                 "type": "boolean",
-                "doc": "Whether the dataset was successfully committed by 
Gobblin and fully successful, useful when users configure pipelines to allow 
for partial failures or non-atomic writes"
+                "doc": "Whether the dataset was successfully committed by 
Gobblin and fully successful, versus when users configure partial failures or 
non-atomic writes"
               }
             ]
           }
@@ -247,3 +268,4 @@
   ]
 }
 
+
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 887d322f7..127d90694 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -2,7 +2,7 @@
   "type": "record",
   "name": "GaaSObservabilityEventExperimental",
   "namespace": "org.apache.gobblin.metrics",
-  "doc": "An experimental format for GaaS to emit events during and after a 
job is executed.",
+  "doc": "The deprecated format for GaaS to emit events during and after a job 
is executed.",
   "fields": [
     {
       "name": "timestamp",
@@ -127,7 +127,7 @@
     {
       "name": "executorId",
       "type": "string",
-      "doc": "The ID of the spec executor that ran or would have ran the job",
+      "doc": "The ID of the spec executor that ran or would have run the job",
       "compliance": "NONE"
     },
     {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 94980017c..68e208e9c 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -58,7 +58,7 @@ import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.JobStatus;
 import org.apache.gobblin.metrics.MetricContext;
@@ -72,12 +72,12 @@ import 
org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
-import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
+import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import 
org.apache.gobblin.service.monitoring.MockGaaSObservabilityEventProducer;
-import 
org.apache.gobblin.service.monitoring.NoopGaaSObservabilityEventProducer;
+import 
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer;
+import 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
@@ -143,7 +143,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-      new NoopGaaSObservabilityEventProducer());
+      new NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -201,7 +201,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -279,7 +279,7 @@ public class KafkaAvroJobStatusMonitorTest {
     }
 
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        new NoopGaaSObservabilityEventProducer());
+        new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -339,7 +339,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Config conf = ConfigFactory.empty().withValue(
         KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + 
RETRY_MULTIPLIER, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
 conf,
-        new NoopGaaSObservabilityEventProducer());
+        new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -397,7 +397,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -456,7 +456,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -507,7 +507,7 @@ public class KafkaAvroJobStatusMonitorTest {
     }
 
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        new NoopGaaSObservabilityEventProducer());
+        new 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -537,7 +537,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
-    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(
         ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, 
false);
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
         mockEventProducer);
@@ -553,15 +553,15 @@ public class KafkaAvroJobStatusMonitorTest {
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
 
-    // Only the COMPLETE event should create a GaaSObservabilityEvent
-    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
-    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
-    GaaSObservabilityEventExperimental event1 = iterator.next();
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
     Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
     Assert.assertEquals(event1.getFlowName(), this.flowName);
     Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
-    Assert.assertEquals(event1.getJobPlanningPhaseStartTime(), 
Long.valueOf(2));
-    Assert.assertEquals(event1.getJobPlanningPhaseEndTime(), Long.valueOf(3));
+    Assert.assertEquals(event1.getJobPlanningStartTimestamp(), 
Long.valueOf(2));
+    Assert.assertEquals(event1.getJobPlanningEndTimestamp(), Long.valueOf(3));
     jobStatusMonitor.shutDown();
   }
 
@@ -584,7 +584,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
-    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+    
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer 
mockEventProducer = new 
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer(
         ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, 
false);
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
         mockEventProducer);
@@ -602,11 +602,11 @@ public class KafkaAvroJobStatusMonitorTest {
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
 
-    // Only the COMPLETE event should create a GaaSObservabilityEvent
-    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
     Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
-    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
     Assert.assertEquals(event1.getJobStatus(), JobStatus.CANCELLED);
     Assert.assertEquals(event1.getFlowName(), this.flowName);
     Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
@@ -714,7 +714,7 @@ public class KafkaAvroJobStatusMonitorTest {
   }
 
   MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
-      GaaSObservabilityEventProducer eventProducer) throws IOException, 
ReflectiveOperationException {
+      GaaSJobObservabilityEventProducer eventProducer) throws IOException, 
ReflectiveOperationException {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
@@ -776,7 +776,7 @@ public class KafkaAvroJobStatusMonitorTest {
      * @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and 
retain) to dial whether `parseJobStatus` throws
      */
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
-        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSObservabilityEventProducer producer)
+        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSJobObservabilityEventProducer producer)
         throws IOException, ReflectiveOperationException {
       super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, mysqlDagActionStore);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 3d33b2f1c..0f435bf53 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -72,7 +72,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 public class BaseFlowGraphHelper {
   private static final int NODE_FILE_DEPTH = 3;
   private static final int EDGE_FILE_DEPTH = 4;
-  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+  public static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
 
   final String baseDirectory;
   private final Config emptyConfig = ConfigFactory.empty();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 218fe8128..ce942cd7a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -83,6 +83,7 @@ import 
org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
 import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 import static org.apache.gobblin.service.ExecutionStatus.*;
@@ -1003,7 +1004,7 @@ public class DagManager extends AbstractIdleService {
 
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
         // Add serialized job properties as part of the orchestrated job event 
metadata
-        jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
dagNode.getValue().toString());
+        jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
         jobOrchestrationTimer.stop(jobMetadata);
         log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
         this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
similarity index 70%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index 0966e458d..bd5c4eb3d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.codahale.metrics.MetricRegistry;
 import com.google.gson.reflect.TypeToken;
 
@@ -35,7 +38,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.DatasetMetric;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.Issue;
 import org.apache.gobblin.metrics.IssueSeverity;
 import org.apache.gobblin.metrics.JobStatus;
@@ -51,34 +54,36 @@ import 
org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
  * A class embedded within GaaS running in the JobStatusMonitor which emits 
GaaSObservabilityEvents after each job in a flow
- * This is an abstract class, we need a sub system like Kakfa, which support 
at least once delivery, to emit the event
+ * This is an abstract class, we need a sub system like Kafka, which support 
at least once delivery, to emit the event
  */
 @Slf4j
-public abstract class GaaSObservabilityEventProducer implements Closeable {
-  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSObservabilityEventProducer.";
-  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
-  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
-  public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
-  public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
+public abstract class GaaSJobObservabilityEventProducer implements Closeable {
+  public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSJobObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSJobObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+  public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
   public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME = 
"jobSucceeded";
 
   protected MetricContext metricContext;
   protected State state;
 
-  List<GaaSObservabilityEventExperimental> eventCollector = new ArrayList<>();
+  List<GaaSJobObservabilityEvent> eventCollector = new ArrayList<>();
   protected OpenTelemetryMetricsBase opentelemetryMetrics;
   protected ObservableLongMeasurement jobStatusMetric;
   protected MultiContextIssueRepository issueRepository;
   protected boolean instrumentationEnabled;
   ContextAwareMeter getIssuesFailedMeter;
 
-  public GaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+  public GaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     this.state = state;
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = instrumentationEnabled;
@@ -104,7 +109,7 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
           .buildObserver();
       this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
           .batchCallback(() -> {
-            for (GaaSObservabilityEventExperimental event : 
this.eventCollector) {
+            for (GaaSJobObservabilityEvent event : this.eventCollector) {
               Attributes tags = getEventAttributes(event);
               int status = event.getJobStatus() == JobStatus.SUCCEEDED ? 1 : 0;
               this.jobStatusMetric.record(status, tags);
@@ -117,77 +122,95 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
   }
 
   public void emitObservabilityEvent(final State jobState) {
-    GaaSObservabilityEventExperimental event = 
createGaaSObservabilityEvent(jobState);
+    GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
     sendUnderlyingEvent(event);
     this.eventCollector.add(event);
   }
 
-  public Attributes getEventAttributes(GaaSObservabilityEventExperimental 
event) {
+  public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
     Attributes tags = 
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
event.getFlowName())
         .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
event.getFlowGroup())
         .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
         .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
         .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId())
-        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowGraphEdgeId())
+        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowEdgeId())
         .build();
     return tags;
   }
 
   /**
-   * Emits the GaaSObservabilityEvent with the mechanism that the child class 
is built upon e.g. Kafka
+   * Emits the GaaSJobObservabilityEvent with the mechanism that the child 
class is built upon e.g. Kafka
    * @param event
    */
-  abstract protected void 
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+  abstract protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event);
 
   /**
-   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job 
pipeline state, which is combination of GTE job states in an ordered fashion
+   * Creates a GaaSJobObservabilityEvent which is derived from a final GaaS 
job pipeline state, which is combination of GTE job states in an ordered fashion
    * @param jobState
-   * @return GaaSObservabilityEvent
+   * @return GaaSJobObservabilityEvent
    */
-  private GaaSObservabilityEventExperimental 
createGaaSObservabilityEvent(final State jobState) {
+  private GaaSJobObservabilityEvent createGaaSObservabilityEvent(final State 
jobState) {
     Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
     Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
     Long jobOrchestratedTime = 
jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
     Long jobPlanningPhaseStartTime = 
jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ? 
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
     Long jobPlanningPhaseEndTime = 
jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ? 
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+    String flowGroup = 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    String flowName = 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+    Properties jobProperties = new Properties();
+    try {
+      jobProperties = 
PropertiesUtils.deserialize(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY, 
""));
+    } catch (IOException e) {
+      log.error("Could not deserialize job properties for flowGroup {} 
flowName {} while creating GaaSJobObservabilityEvent due to ", flowGroup, 
flowName, e);
+    }
+
+    String fullFlowEdge = 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "");
+    // Parse the flow edge from edge id that is stored in format 
sourceNode_destinationNode_flowEdgeId
+    String edgeId = StringUtils.substringAfter(
+        StringUtils.substringAfter(fullFlowEdge, 
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
"")),
+        BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR);
+
     Type datasetTaskSummaryType = new 
TypeToken<ArrayList<DatasetTaskSummary>>(){}.getType();
     List<DatasetTaskSummary> datasetTaskSummaries = 
jobState.contains(TimingEvent.DATASET_TASK_SUMMARIES) ?
         
GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(jobState.getProp(TimingEvent.DATASET_TASK_SUMMARIES),
 datasetTaskSummaryType) : null;
     List<DatasetMetric> datasetMetrics = datasetTaskSummaries != null ? 
datasetTaskSummaries.stream().map(
        DatasetTaskSummary::toDatasetMetric).collect(Collectors.toList()) : 
null;
 
-    GaaSObservabilityEventExperimental.Builder builder = 
GaaSObservabilityEventExperimental.newBuilder();
+    GaaSJobObservabilityEvent.Builder builder = 
GaaSJobObservabilityEvent.newBuilder();
     List<Issue> issueList = null;
     try {
       issueList = getIssuesForJob(issueRepository, jobState);
     } catch (Exception e) {
       // If issues cannot be fetched, increment metric but continue to try to 
emit the event
-      log.error("Could not fetch issues while creating GaaSObservabilityEvent 
due to ", e);
+      log.error("Could not fetch issues while creating 
GaaSJobObservabilityEvent due to ", e);
       if (this.instrumentationEnabled) {
         this.getIssuesFailedMeter.mark();
       }
     }
     JobStatus status = convertExecutionStatusTojobState(jobState, 
ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
-    builder.setTimestamp(System.currentTimeMillis())
-        
.setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
-        
.setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
-        
.setFlowGraphEdgeId(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
 ""))
+    builder.setEventTimestamp(System.currentTimeMillis())
+        .setFlowName(flowName)
+        .setFlowGroup(flowGroup)
         
.setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
-        
.setLastFlowModificationTime(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 0))
+        
.setLastFlowModificationTimestamp(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 0))
         
.setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
         .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
         
.setExecutorId(jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
 ""))
-        .setJobStartTime(jobStartTime)
-        .setJobEndTime(jobEndTime)
-        .setJobOrchestratedTime(jobOrchestratedTime)
-        .setJobPlanningPhaseStartTime(jobPlanningPhaseStartTime)
-        .setJobPlanningPhaseEndTime(jobPlanningPhaseEndTime)
+        .setJobStartTimestamp(jobStartTime)
+        .setJobEndTimestamp(jobEndTime)
+        .setJobOrchestratedTimestamp(jobOrchestratedTime)
+        .setJobPlanningStartTimestamp(jobPlanningPhaseStartTime)
+        .setJobPlanningEndTimestamp(jobPlanningPhaseEndTime)
         .setIssues(issueList)
         .setJobStatus(status)
-        
.setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
-        .setDatasetsWritten(datasetMetrics)
+        
.setEffectiveUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
+        .setDatasetsMetrics(datasetMetrics)
         
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
null))
-        .setJobProperties(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY, 
null));
+        
.setJobProperties(GsonUtils.GSON_WITH_DATE_HANDLING.newBuilder().create().toJson(jobProperties))
+        
.setSourceNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY,
 ""))
+        
.setDestinationNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
 ""))
+        .setFlowEdgeId(!edgeId.isEmpty() ? edgeId : fullFlowEdge)
+        .setExecutorUrn(null); //TODO: Fill with information from job execution
     return builder.build();
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 4aced6afb..eb405ba60 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -66,7 +66,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads, JobIssueEventHandler jobIssueEventHandler,
-      GaaSObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
       throws IOException, ReflectiveOperationException {
     super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagActionStore);
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index ab6d62f93..c72502340 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -115,12 +115,12 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
   private final JobIssueEventHandler jobIssueEventHandler;
   private final Retryer<Void> persistJobStatusRetryer;
-  private final GaaSObservabilityEventProducer eventProducer;
+  private final GaaSJobObservabilityEventProducer eventProducer;
   private final DagActionStore dagActionStore;
   private final boolean dagProcEngineEnabled;
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
-      GaaSObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, 
FileContextBasedFsStateStoreFactory.class.getName());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index ecc759333..a1698e57a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -97,9 +97,9 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
     }
     jobStatusConfig = 
jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
     Class observabilityEventProducerClassName = 
Class.forName(ConfigUtils.getString(config,
-        
GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
-        
GaaSObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
-    GaaSObservabilityEventProducer observabilityEventProducer = 
(GaaSObservabilityEventProducer) 
GobblinConstructorUtils.invokeLongestConstructor(
+        
GaaSJobObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
+        
GaaSJobObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
+    GaaSJobObservabilityEventProducer observabilityEventProducer = 
(GaaSJobObservabilityEventProducer) 
GobblinConstructorUtils.invokeLongestConstructor(
         observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
 
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
similarity index 75%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
index f8b23bd62..a0eeb88e1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
@@ -18,7 +18,7 @@
 package org.apache.gobblin.service.monitoring;
 
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
 
@@ -26,13 +26,13 @@ import 
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
  * The default producer for emitting GaaS Observability Events in the 
KafkaJobStatusMonitor
  * This class does no work and will not create or emit any events
  */
-public class NoopGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {
+public class NoopGaaSJobObservabilityEventProducer extends 
GaaSJobObservabilityEventProducer {
 
-  public NoopGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+  public NoopGaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     super(state, issueRepository, instrumentationEnabled);
   }
 
-  public NoopGaaSObservabilityEventProducer() {
+  public NoopGaaSJobObservabilityEventProducer() {
     super(null, null, false);
   }
 
@@ -40,5 +40,5 @@ public class NoopGaaSObservabilityEventProducer extends 
GaaSObservabilityEventPr
   public void emitObservabilityEvent(State jobState) {}
 
   @Override
-  protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) 
{}
+  protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {}
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
similarity index 80%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index d2fdbd7b7..1f3da5205 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -37,7 +37,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.JobStatus;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
@@ -54,9 +54,10 @@ import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
-public class GaaSObservabilityProducerTest {
+public class GaaSJobObservabilityProducerTest {
 
   private MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
 
@@ -75,17 +76,22 @@ public class GaaSObservabilityProducerTest {
     DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000, 
10000, false);
     summaries.add(dataset1);
     summaries.add(dataset2);
+    Properties jobProps = new Properties();
+    jobProps.setProperty("flow.executionId", "1681242538558");
+    jobProps.setProperty("user.to.proxy", "newUser");
+    jobProps.setProperty("gobblin.flow.sourceIdentifier", "sourceNode");
+    jobProps.setProperty("gobblin.flow.destinationIdentifier", 
"destinationNode");
 
     State state = new State();
     state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
"testCluster");
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(state, this.issueRepository, false);
+    MockGaaSJobObservabilityEventProducer producer = new 
MockGaaSJobObservabilityEventProducer(state, this.issueRepository, false);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
     
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowExecutionId);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"sourceNode_destinationNode_flowEdge");
     gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
     gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
     gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
@@ -95,16 +101,16 @@ public class GaaSObservabilityProducerTest {
     gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
     
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 "20");
     gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
-    gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+    gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
     Properties jobStatusProps = new Properties();
     jobStatusProps.putAll(gteEventMetadata);
     producer.emitObservabilityEvent(new State(jobStatusProps));
 
-    List<GaaSObservabilityEventExperimental> emittedEvents = 
producer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedEvents();
 
     Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
-    GaaSObservabilityEventExperimental event = iterator.next();
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event = iterator.next();
     Assert.assertEquals(event.getFlowGroup(), flowGroup);
     Assert.assertEquals(event.getFlowName(), flowName);
     Assert.assertEquals(event.getJobName(), jobName);
@@ -112,26 +118,28 @@ public class GaaSObservabilityProducerTest {
     Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
     Assert.assertEquals(event.getExecutorUrl(), "hostName");
     Assert.assertEquals(event.getIssues().size(), 1);
-    Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+    Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+    Assert.assertEquals(event.getSourceNode(), "sourceNode");
+    Assert.assertEquals(event.getDestinationNode(), "destinationNode");
     Assert.assertEquals(event.getExecutorId(), "specExecutor");
-    Assert.assertEquals(event.getExecutionUserUrn(), "azkabanUser");
-    Assert.assertEquals(event.getJobOrchestratedTime(), Long.valueOf(1));
-    Assert.assertEquals(event.getLastFlowModificationTime(), Long.valueOf(20));
-    Assert.assertEquals(event.getJobStartTime(), Long.valueOf(20));
-    Assert.assertEquals(event.getJobEndTime(), Long.valueOf(100));
-    Assert.assertEquals(event.getDatasetsWritten().size(), 2);
-    Assert.assertEquals(event.getDatasetsWritten().get(0).getDatasetUrn(), 
dataset1.getDatasetUrn());
-    
Assert.assertEquals(event.getDatasetsWritten().get(0).getEntitiesWritten(), 
Long.valueOf(dataset1.getRecordsWritten()));
-    Assert.assertEquals(event.getDatasetsWritten().get(0).getBytesWritten(), 
Long.valueOf(dataset1.getBytesWritten()));
-    
Assert.assertEquals(event.getDatasetsWritten().get(0).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
-    Assert.assertEquals(event.getDatasetsWritten().get(1).getDatasetUrn(), 
dataset2.getDatasetUrn());
-    
Assert.assertEquals(event.getDatasetsWritten().get(1).getEntitiesWritten(), 
Long.valueOf(dataset2.getRecordsWritten()));
-    Assert.assertEquals(event.getDatasetsWritten().get(1).getBytesWritten(), 
Long.valueOf(dataset2.getBytesWritten()));
-    
Assert.assertEquals(event.getDatasetsWritten().get(1).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
-    Assert.assertEquals(event.getJobProperties(), 
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+    Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
+    Assert.assertEquals(event.getJobOrchestratedTimestamp(), Long.valueOf(1));
+    Assert.assertEquals(event.getLastFlowModificationTimestamp(), 
Long.valueOf(20));
+    Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
+    Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
+    Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
+    Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(), 
dataset1.getDatasetUrn());
+    
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(), 
Long.valueOf(dataset1.getRecordsWritten()));
+    Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(), 
Long.valueOf(dataset1.getBytesWritten()));
+    
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
+    Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(), 
dataset2.getDatasetUrn());
+    
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(), 
Long.valueOf(dataset2.getRecordsWritten()));
+    Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(), 
Long.valueOf(dataset2.getBytesWritten()));
+    
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
+    Assert.assertEquals(event.getJobProperties(), 
"{\"gobblin.flow.sourceIdentifier\":\"sourceNode\",\"gobblin.flow.destinationIdentifier\":\"destinationNode\",\"user.to.proxy\":\"newUser\",\"flow.executionId\":\"1681242538558\"}");
     Assert.assertEquals(event.getGaasId(), "testCluster");
-    AvroSerializer<GaaSObservabilityEventExperimental> serializer = new 
AvroBinarySerializer<>(
-        GaaSObservabilityEventExperimental.SCHEMA$, new 
NoopSchemaVersionWriter()
+    AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+        GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
     );
     serializer.serializeRecord(event);
   }
@@ -146,7 +154,8 @@ public class GaaSObservabilityProducerTest {
         TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
         createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(new State(), this.issueRepository, false);
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(new State(), 
this.issueRepository, false);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
@@ -161,26 +170,26 @@ public class GaaSObservabilityProducerTest {
     jobStatusProps.putAll(gteEventMetadata);
     producer.emitObservabilityEvent(new State(jobStatusProps));
 
-    List<GaaSObservabilityEventExperimental> emittedEvents = 
producer.getTestEmittedEvents();
+    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedEvents();
 
     Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
-    GaaSObservabilityEventExperimental event = iterator.next();
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event = iterator.next();
     Assert.assertEquals(event.getFlowGroup(), flowGroup);
     Assert.assertEquals(event.getFlowName(), flowName);
     Assert.assertEquals(event.getJobName(), jobName);
     Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
     Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
     Assert.assertEquals(event.getIssues().size(), 1);
-    Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+    Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
     Assert.assertEquals(event.getExecutorId(), "specExecutor");
-    Assert.assertEquals(event.getJobOrchestratedTime(), null);
-    Assert.assertEquals(event.getJobStartTime(), null);
-    Assert.assertEquals(event.getExecutionUserUrn(), null);
+    Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
+    Assert.assertEquals(event.getJobStartTimestamp(), null);
+    Assert.assertEquals(event.getEffectiveUserUrn(), null);
     Assert.assertEquals(event.getExecutorUrl(), null);
 
-    AvroSerializer<GaaSObservabilityEventExperimental> serializer = new 
AvroBinarySerializer<>(
-        GaaSObservabilityEventExperimental.SCHEMA$, new 
NoopSchemaVersionWriter()
+    AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+        GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
     );
     serializer.serializeRecord(event);
   }
@@ -199,7 +208,8 @@ public class GaaSObservabilityProducerTest {
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
 "http://localhost:5000";);
 
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true);
 
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
@@ -231,7 +241,8 @@ public class GaaSObservabilityProducerTest {
     State producerState = new State();
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
 
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
similarity index 75%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
index eb9269535..528edc273 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
@@ -22,20 +22,20 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
 import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
 import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
 
 /**
- * An extension of GaaSObservabilityEventProducer which creates the events and 
stores them in a list
+ * An extension of GaaSJobObservabilityEventProducer which creates the events 
and stores them in a list
  * Tests can use a getter to fetch a read-only version of the events that were 
emitted
  */
-public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {
-  private List<GaaSObservabilityEventExperimental> emittedEvents = new 
ArrayList<>();
+public class MockGaaSJobObservabilityEventProducer extends 
GaaSJobObservabilityEventProducer {
+  private List<GaaSJobObservabilityEvent> emittedEvents = new ArrayList<>();
 
-  public MockGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+  public MockGaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     super(state, issueRepository, instrumentationEnabled);
   }
 
@@ -44,7 +44,7 @@ public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventPr
     return InMemoryOpenTelemetryMetrics.getInstance(state);
   }
   @Override
-  protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) 
{
+  protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {
     emittedEvents.add(event);
   }
 
@@ -53,7 +53,7 @@ public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventPr
    * This should only be used as a read-only object for emitted 
GaaSObservabilityEvents
    * @return list of events that would have been emitted
    */
-  public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
+  public List<GaaSJobObservabilityEvent> getTestEmittedEvents() {
     return Collections.unmodifiableList(this.emittedEvents);
   }
 

Reply via email to