[ 
https://issues.apache.org/jira/browse/GOBBLIN-1764?focusedWorklogId=840115&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840115
 ]

ASF GitHub Bot logged work on GOBBLIN-1764:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jan/23 23:09
            Start Date: 18/Jan/23 23:09
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080655252


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void 
testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
     Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
-  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
-  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+  @Test (dependsOnMethods = 
"testProcessProgressingMessageWhenNoPreviousStatus")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic6");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1, 4),
-        createJobSLAKilledEvent(),
-        createJobOrchestratedEvent(2, 4),
-        createJobStartSLAKilledEvent(),
-        // Verify that kill event will not retry
-        createJobOrchestratedEvent(3, 4),
-        createJobCancelledEvent()
+        createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
-
     try {
       Thread.sleep(1000);
-    } catch(InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
-      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
-      this::convertMessageAndMetadataToDecodableKafkaRecord);
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
-    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.shutDown();
+  }
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+  @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+  public void testObservabilityEventSingleEmission() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobCancelledEvent(),
+        createJobSucceededEvent() // This event should be ignored
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);

Review Comment:
   Yeah I think I will definitely need to cut down on the boilerplate for 
future tests





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840115)
    Time Spent: 5h  (was: 4h 50m)

> Emit GaaSObservabilityEvent
> ---------------------------
>
>                 Key: GOBBLIN-1764
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1764
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> GaaSObservabilityEvents are a new events that provides a job summary from 
> pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per 
> job pipeline, and it intended to be easily queryable and alert on.
> We want to emit this observability event from GaaS by deriving it from a 
> job's job status. Since this feature is Experimental and WIP, it is not 
> expected to fill out all of the fields immediately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to