[
https://issues.apache.org/jira/browse/GOBBLIN-1764?focusedWorklogId=840115&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840115
]
ASF GitHub Bot logged work on GOBBLIN-1764:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jan/23 23:09
Start Date: 18/Jan/23 23:09
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080655252
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void
testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}
- @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
- public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
- KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+ @Test (dependsOnMethods =
"testProcessProgressingMessageWhenNoPreviousStatus")
+ public void testJobMonitorCreatesGaaSObservabilityEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic6");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1, 4),
- createJobSLAKilledEvent(),
- createJobOrchestratedEvent(2, 4),
- createJobStartSLAKilledEvent(),
- // Verify that kill event will not retry
- createJobOrchestratedEvent(3, 4),
- createJobCancelledEvent()
+ createJobSucceededEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});
-
try {
Thread.sleep(1000);
- } catch(InterruptedException ex) {
+ } catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
-
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
- this.kafkaTestHelper.getIteratorForTopic(TOPIC),
- this::convertMessageAndMetadataToDecodableKafkaRecord);
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
-
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+ // Only the COMPLETE event should create a GaaSObservabilityEvent
+ List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- //Job orchestrated for retrying
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.shutDown();
+ }
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+ @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+ public void testObservabilityEventSingleEmission() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobCancelledEvent(),
+ createJobSucceededEvent() // This event should be ignored
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
Review Comment:
Yeah I think I will definitely need to cut down on the boilerplate for
future tests
Issue Time Tracking
-------------------
Worklog Id: (was: 840115)
Time Spent: 5h (was: 4h 50m)
> Emit GaaSObservabilityEvent
> ---------------------------
>
> Key: GOBBLIN-1764
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1764
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> GaaSObservabilityEvents are a new events that provides a job summary from
> pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per
> job pipeline, and it intended to be easily queryable and alert on.
> We want to emit this observability event from GaaS by deriving it from a
> job's job status. Since this feature is Experimental and WIP, it is not
> expected to fill out all of the fields immediately.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)