[ 
https://issues.apache.org/jira/browse/GOBBLIN-2079?focusedWorklogId=922286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-922286
 ]

ASF GitHub Bot logged work on GOBBLIN-2079:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jun/24 06:21
            Start Date: 06/Jun/24 06:21
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1628818301


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+  "type": "record",
+  "name": "GaaSFlowObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a flow is 
executed.",

Review Comment:
   please clarify under what circumstances we anticipate emitting the event 
*during* flow execution



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+  "type": "record",
+  "name": "GaaSFlowObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a flow is 
executed.",
+  "fields": [
+    {
+      "name": "eventTimestamp",
+      "type": "long",
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
+    },
+    {
+      "name": "flowGroup",
+      "type": "string",
+      "doc": "Flow group for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowName",
+      "type": "string",
+      "doc": "Flow name for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowExecutionId",
+      "type": "long",
+      "doc": "Flow execution id for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "lastFlowModificationTimestamp",
+      "type": "long",
+      "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
+    },
+    {
+      "name": "sourceNode",
+      "type": "string",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowStatus",
+      "type": {
+        "type": "enum",
+        "name": "FlowStatus",
+        "symbols": [
+          "SUCCEEDED",
+          "COMPILATION_FAILURE",
+          "SUBMISSION_FAILURE",
+          "EXECUTION_FAILURE",
+          "CANCELLED"
+        ],
+        "doc": "Final flow status for the GaaS flow",
+        "compliance": "NONE"
+      }
+    },
+    {
+      "name": "effectiveUserUrn",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job e.g. myGroup",
+      "compliance": "NONE"
+    },
+    {
+      "name": "gaasId",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null,
+      "doc": "The deployment ID of GaaS that is sending the event (if multiple 
GaaS instances are running)"
+    }
+  ]

Review Comment:
   any flow start/end timestamp?
   
   also, what about `flowProperties` - e.g. those supplied at 
flow-definition-time?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -619,6 +621,64 @@ public void testObservabilityEventSingleEmission() throws 
IOException, Reflectiv
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent

Review Comment:
   GaaSFlowObservabilityEvent



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -69,6 +71,7 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSJobObservabilityEventProducer.";
   public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSJobObservabilityEventProducer.class.getName();
+  public static final String EMIT_FLOW_OBSERVABILITY_EVENT = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "emitFlowObservabilityEvent";

Review Comment:
   NBD, but the prefix doesn't align well w/ the flow-level name



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java:
##########
@@ -195,6 +197,48 @@ public void 
testCreateGaaSObservabilityEventWithPartialMetadata() throws Excepti
     serializer.serializeRecord(event);
   }
 
+  @Test
+  public void testCreateGaaSObservabilityFlowEvent() throws Exception {
+    String flowGroup = "testFlowGroup3";
+    String flowName = "testFlowName3";
+    String jobName = JobStatusRetriever.NA_KEY;
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(new State(), 
this.issueRepository, false);

Review Comment:
   FSR, I thought that empty `State()` would adopt the default of `false`, 
which would be to emit a job-schema event for the flow-level (w/ `"NA"` values 
for some fields).  I thought explicit enablement would be needed to actually 
convert to and produce a flow-schema event





Issue Time Tracking
-------------------

    Worklog Id:     (was: 922286)
    Time Spent: 20m  (was: 10m)

> GaaSObservabilityEvents should have a dedicated flow level event
> ----------------------------------------------------------------
>
>                 Key: GOBBLIN-2079
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2079
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> GaaSJobObservabilityEvents currently encapsulate both jobs and flows in GaaS.
> This is fine for single hop flows, but flows with multiple jobs encapsulated 
> in them now have a mix of job level events with the majority of metadata, and 
> flow level events which provide a better view to users when their flow fails 
> at any given point.
> Since the data in both events differs vastly with most metadata only having 
> contextual sense in the job level event, we should separate job and flow 
> level events to their own respective event types.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to