umustafi commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1695933619
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -713,6 +715,126 @@ public void testObservabilityEventSingleEmission() throws
IOException, Reflectiv
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+ public void testObservabilityEventFlowLevel() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobSucceededEvent(),
+ createFlowSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(producerState,
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer, mock(DagManagementStateStore.class));
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Review Comment:
is getNextJobStatusState used for flow and job with NA as default for a flow
level event? Is the NA value codified somewhere that we can reference here?
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -713,6 +715,126 @@ public void testObservabilityEventSingleEmission() throws
IOException, Reflectiv
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+ public void testObservabilityEventFlowLevel() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobSucceededEvent(),
+ createFlowSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(producerState,
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer, mock(DagManagementStateStore.class));
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+ // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+ List<GaaSFlowObservabilityEvent> emittedFlowEvents =
mockEventProducer.getTestEmittedFlowEvents();
+ Assert.assertEquals(emittedFlowEvents.size(), 1);
+ Iterator<GaaSFlowObservabilityEvent> flowIterator =
emittedFlowEvents.iterator();
+ GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+ Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.SUCCEEDED);
+ Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+ Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+ jobStatusMonitor.shutDown();
+ }
+
+ @Test (dependsOnMethods = "testObservabilityEventFlowLevel")
+ public void testObservabilityEventFlowFailed() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobFailedEvent(),
+ createFlowFailedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(producerState,
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer, mock(DagManagementStateStore.class));
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
+
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.EXECUTION_FAILURE);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+ // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
Review Comment:
update the comment here and on 817 to say "FAILED"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -81,12 +85,14 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
protected ObservableLongMeasurement jobStatusMetric;
protected MultiContextIssueRepository issueRepository;
protected boolean instrumentationEnabled;
+ protected boolean emitFlowObservabilityEvent;
ContextAwareMeter getIssuesFailedMeter;
public GaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
this.state = state;
this.issueRepository = issueRepository;
this.instrumentationEnabled = instrumentationEnabled;
+ this.emitFlowObservabilityEvent =
this.state.getPropAsBoolean(EMIT_FLOW_OBSERVABILITY_EVENT, false);
Review Comment:
do we not want these emitted by default? are job level events always emitted
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -123,7 +129,11 @@ private void setupMetrics(State state) {
public void emitObservabilityEvent(final State jobState) {
GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
- sendUnderlyingEvent(event);
+ if
(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD).equals(JobStatusRetriever.NA_KEY)
&& this.emitFlowObservabilityEvent) {
Review Comment:
why do we only emit flow type events in some cases? is this for backwards
compatibility? what's the harm in emitting FlowLevel events going forward?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]