This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8702d9691 [GOBBLIN-1764] Emit observability event (#3623)
8702d9691 is described below
commit 8702d96914a3dd33551c801b80fe119bb5f7b672
Author: William Lo <[email protected]>
AuthorDate: Thu Jan 19 12:09:49 2023 -0800
[GOBBLIN-1764] Emit observability event (#3623)
* Add schemas for observability events in GaaS
* Address reviews
* Address reviews
* Clean up schema, address review for GaaSObservabilityEvent
* Clarify default types, add failure types
* Fix schema type array
* Modify proxy user field to be user identification
* Address final review
* Rename userIdentity to executionUserUrn
* WIP
* WIP
* WIP
* Create and populate GaaSObservabilityEvent
* Add tests
* cleanup
* Fix bug where state was not fetched properly
* Add additional tests
* Cleanup
* Address review and clean up
* Fix checkstyle
* Fix bug in monitor factory
* Fix checkstyles
* Add javadoc for mock classes
* Address reviews
* Remove unused imports
* fix test compilation for mock producer
* Remove stray newline
---
.../avro/GaaSObservabilityEventExperimental.avsc | 16 +-
gobblin-modules/gobblin-kafka-09/build.gradle | 1 +
.../runtime/KafkaAvroJobStatusMonitorTest.java | 186 ++++++++++++++++-----
.../monitoring/GaaSObservabilityEventProducer.java | 161 ++++++++++++++++++
.../monitoring/KafkaAvroJobStatusMonitor.java | 4 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 25 ++-
.../monitoring/KafkaJobStatusMonitorFactory.java | 16 +-
.../NoopGaaSObservabilityEventProducer.java | 44 +++++
.../monitoring/GaaSObservabilityProducerTest.java | 90 ++++++++++
.../service/monitoring/JobStatusRetrieverTest.java | 3 +-
.../MockGaaSObservabilityEventProducer.java | 53 ++++++
.../monitoring/MysqlJobStatusRetrieverTest.java | 4 +-
12 files changed, 547 insertions(+), 56 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 47c9be6c9..a9039dd90 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -53,7 +53,7 @@
"SUCCEEDED",
"COMPILATION_FAILURE",
"SUBMISSION_FAILURE",
- "EXEUCTION_FAILURE",
+ "EXECUTION_FAILURE",
"CANCELLED"
],
"doc": "Final job status for this job in the GaaS flow",
@@ -70,14 +70,20 @@
},
{
"name": "jobStartTime",
- "type": "long",
- "doc": "Start time of the job in millis since Epoch",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the job in millis since Epoch, null if the job was
never run",
"compliance": "NONE"
},
{
"name": "jobEndTime",
- "type": "long",
- "doc": "Finish time of the job in millis since Epoch",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Finish time of the job in millis since Epoch, null if the job
was never run",
"compliance": "NONE"
},
{
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle
b/gobblin-modules/gobblin-kafka-09/build.gradle
index e13f4deb2..9dd1604d9 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -50,6 +50,7 @@ dependencies {
runtimeOnly externalDependency.protobuf
testCompile project(":gobblin-service")
+ testCompile project(path: ":gobblin-service", configuration: "tests")
testCompile project(":gobblin-modules:gobblin-service-kafka")
testCompile project(path: ":gobblin-runtime", configuration: "tests")
testCompile project(path: ":gobblin-metastore", configuration:
"testFixtures")
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index fc5b87802..6f1069e15 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -58,18 +58,26 @@ import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.JobStatus;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
import org.apache.gobblin.metrics.kafka.Pusher;
+import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import
org.apache.gobblin.service.monitoring.MockGaaSObservabilityEventProducer;
+import
org.apache.gobblin.service.monitoring.NoopGaaSObservabilityEventProducer;
+import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
import static org.mockito.Mockito.mock;
@@ -131,7 +139,8 @@ public class KafkaAvroJobStatusMonitorTest {
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -189,7 +198,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -266,7 +275,8 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -325,7 +335,8 @@ public class KafkaAvroJobStatusMonitorTest {
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new
AtomicBoolean(false);
Config conf = ConfigFactory.empty().withValue(
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." +
RETRY_MULTIPLIER,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
conf);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
conf,
+ new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -358,6 +369,66 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1, 4),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(2, 4),
+ createJobStartSLAKilledEvent(),
+ // Verify that kill event will not retry
+ createJobOrchestratedEvent(3, 4),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ // Received kill flow event, should not retry the flow even though there
is 1 pending attempt left
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
+
+ jobStatusMonitor.shutDown();
+ }
+
@Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
public void testProcessProgressingMessageWhenNoPreviousStatus() throws
IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic5");
@@ -376,7 +447,8 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -387,62 +459,95 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}
- @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
- public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
- KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+ @Test (dependsOnMethods =
"testProcessProgressingMessageWhenNoPreviousStatus")
+ public void testJobMonitorCreatesGaaSObservabilityEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic6");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1, 4),
- createJobSLAKilledEvent(),
- createJobOrchestratedEvent(2, 4),
- createJobStartSLAKilledEvent(),
- // Verify that kill event will not retry
- createJobOrchestratedEvent(3, 4),
- createJobCancelledEvent()
+ createJobSucceededEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});
-
try {
Thread.sleep(1000);
- } catch(InterruptedException ex) {
+ } catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
-
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
- this.kafkaTestHelper.getIteratorForTopic(TOPIC),
- this::convertMessageAndMetadataToDecodableKafkaRecord);
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
-
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+ // Only the COMPLETE event should create a GaaSObservabilityEvent
+ List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- //Job orchestrated for retrying
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.shutDown();
+ }
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+ @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+ public void testObservabilityEventSingleEmission() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobCancelledEvent(),
+ createJobSucceededEvent() // This event should be ignored
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- //Job orchestrated for retrying
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- // Received kill flow event, should not retry the flow even though there
is 1 pending attempt left
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
-
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
+
+ // Only the COMPLETE event should create a GaaSObservabilityEvent
+ List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.CANCELLED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
jobStatusMonitor.shutDown();
}
@@ -494,6 +599,10 @@ public class KafkaAvroJobStatusMonitorTest {
return createGTE(TimingEvent.LauncherTimings.JOB_SUCCEEDED,
Maps.newHashMap());
}
+ private GobblinTrackingEvent createFlowSucceededEvent() {
+ return createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED,
Maps.newHashMap());
+ }
+
private GobblinTrackingEvent createJobFailedEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_FAILED,
Maps.newHashMap());
}
@@ -527,13 +636,14 @@ public class KafkaAvroJobStatusMonitorTest {
return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
}
- MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws
IOException, ReflectiveOperationException {
+ MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+ GaaSObservabilityEventProducer eventProducer) throws IOException,
ReflectiveOperationException {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"))
.withFallback(additionalConfig);
- return new MockKafkaAvroJobStatusMonitor("test",config, 1,
shouldThrowFakeExceptionInParseJobStatusToggle);
+ return new MockKafkaAvroJobStatusMonitor("test",config, 1,
shouldThrowFakeExceptionInParseJobStatusToggle, eventProducer);
}
/**
* Create a dummy event to test if it is filtered out by the consumer.
@@ -589,9 +699,9 @@ public class KafkaAvroJobStatusMonitorTest {
* @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and
retain) to dial whether `parseJobStatus` throws
*/
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
- AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle)
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, mock(JobIssueEventHandler.class));
+ super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..0816613a9
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
+ public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+ protected MetricContext metricContext;
+ protected State state;
+ protected MultiContextIssueRepository issueRepository;
+ boolean instrumentationEnabled;
+ ContextAwareMeter getIssuesFailedMeter;
+
+ public GaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ this.state = state;
+ this.issueRepository = issueRepository;
+ this.instrumentationEnabled = instrumentationEnabled;
+ if (this.instrumentationEnabled) {
+ this.metricContext = Instrumented.getMetricContext(state, getClass());
+ this.getIssuesFailedMeter =
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ISSUES_READ_FAILED_METRIC_NAME));
+ }
+ }
+
+ public void emitObservabilityEvent(final State jobState) {
+ GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
+ sendUnderlyingEvent(event);
+ }
+
+ /**
+ * Emits the GaaSObservabilityEvent with the mechanism that the child class
is built upon e.g. Kafka
+ * @param event
+ */
+ abstract protected void
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+ /**
+ * Creates a GaaSObservabilityEvent which is derived from a final GaaS job
pipeline state, which is combination of GTE job states in an ordered fashion
+ * @param jobState
+ * @return GaaSObservabilityEvent
+ */
+ private GaaSObservabilityEventExperimental
createGaaSObservabilityEvent(final State jobState) {
+ Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+ Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+ GaaSObservabilityEventExperimental.Builder builder =
GaaSObservabilityEventExperimental.newBuilder();
+ List<Issue> issueList = null;
+ try {
+ issueList = getIssuesForJob(issueRepository, jobState);
+ } catch (Exception e) {
+ // If issues cannot be fetched, increment metric but continue to try to
emit the event
+ log.error("Could not fetch issues while creating GaaSObservabilityEvent
due to ", e);
+ if (this.instrumentationEnabled) {
+ this.getIssuesFailedMeter.mark();
+ }
+ }
+ JobStatus status = convertExecutionStatusTojobState(jobState,
ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+ builder.setTimestamp(System.currentTimeMillis())
+
.setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+
.setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+
.setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+
.setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+ .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+ .setJobStartTime(jobStartTime)
+ .setJobEndTime(jobEndTime)
+ .setIssues(issueList)
+ .setJobStatus(status)
+ // TODO: Populate the below fields in a separate PR
+ .setExecutionUserUrn(null)
+ .setExecutorId("")
+ .setLastFlowModificationTime(0)
+ .setFlowGraphEdgeId("")
+ .setJobOrchestratedTime(null); // TODO: Investigate why
TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+ return builder.build();
+ }
+
+ private static JobStatus convertExecutionStatusTojobState(State state,
ExecutionStatus executionStatus) {
+ switch (executionStatus) {
+ case FAILED:
+ // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION
FAILURE, investigate events to populate these fields
+ if (state.contains(TimingEvent.JOB_END_TIME)) {
+ return JobStatus.EXECUTION_FAILURE;
+ }
+ return JobStatus.SUBMISSION_FAILURE;
+ case COMPLETE:
+ return JobStatus.SUCCEEDED;
+ case CANCELLED:
+ // TODO: If cancelled due to start SLA exceeded, consider grouping
this as a submission failure?
+ return JobStatus.CANCELLED;
+ default:
+ return null;
+ }
+ }
+
+ private static List<Issue> getIssuesForJob(MultiContextIssueRepository
issueRepository, State jobState) throws TroubleshooterException {
+ return
issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+ issue -> new Issue(
+ issue.getTime().toEpochSecond(),
+ IssueSeverity.valueOf(issue.getSeverity().toString()),
+ issue.getCode(),
+ issue.getSummary(),
+ issue.getDetails(),
+ issue.getProperties()
+ )).collect(Collectors.toList());
+ }
+
+ @Override
+ public void close() throws IOException {
+ // producer close will handle by the cache
+ if (this.instrumentationEnabled) {
+ this.metricContext.close();
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 1be36f4c8..20328c36d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -65,9 +65,9 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
private Meter messageParseFailures;
public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
- JobIssueEventHandler jobIssueEventHandler)
+ JobIssueEventHandler jobIssueEventHandler,
GaaSObservabilityEventProducer observabilityEventProducer)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, jobIssueEventHandler);
+ super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer);
if (ConfigUtils.getBoolean(config,
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new
KafkaAvroSchemaRegistryFactory().
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 1a19e755e..7d1654af3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -107,10 +107,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
private final JobIssueEventHandler jobIssueEventHandler;
-
private final Retryer<Void> persistJobStatusRetryer;
+ private final GaaSObservabilityEventProducer eventProducer;
+
- public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler)
+ public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
+ GaaSObservabilityEventProducer observabilityEventProducer)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY,
FileContextBasedFsStateStoreFactory.class.getName());
@@ -136,6 +138,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
}));
+ this.eventProducer = observabilityEventProducer;
}
@Override
@@ -187,14 +190,14 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
org.apache.gobblin.configuration.State jobStatus =
parseJobStatus(gobblinTrackingEvent);
if (jobStatus != null) {
try (Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
- addJobStatusToStateStore(jobStatus, this.stateStore);
+ addJobStatusToStateStore(jobStatus, this.stateStore,
this.eventProducer);
}
}
return null;
});
} catch (ExecutionException ee) {
String msg = String.format("Failed to add job status to state store for
kafka offset %d", message.getOffset());
- log.warn(msg, ee.getCause());
+ log.warn(msg, ee);
// Throw RuntimeException to avoid advancing kafka offsets without
updating state store
throw new RuntimeException(msg, ee.getCause());
} catch (RetryException re) {
@@ -219,7 +222,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
* @throws IOException
*/
@VisibleForTesting
- static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
+ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore,
+ GaaSObservabilityEventProducer eventProducer)
throws IOException {
try {
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
@@ -266,6 +270,9 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
modifyStateIfRetryRequired(jobStatus);
stateStore.put(storeName, tableName, jobStatus);
+ if (isNewStateTransitionToFinal(jobStatus, states)) {
+ eventProducer.emitObservabilityEvent(jobStatus);
+ }
} catch (Exception e) {
log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
@@ -288,6 +295,14 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ static boolean
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+ if (prevStates.size() == 0) {
+ return
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+ return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+
/**
* Merge states based on precedence defined by {@link
#ORDERED_EXECUTION_STATUSES}.
* The state instance in the 1st argument reflects the more recent state of
a job
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 7f6fc51ad..d4129f1ea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -45,11 +46,16 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
private final Config config;
private final JobIssueEventHandler jobIssueEventHandler;
+ private final MultiContextIssueRepository issueRepository;
+ private final boolean instrumentationEnabled;
@Inject
- public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler) {
+ public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
+ boolean instrumentationEnabled) {
this.config = Objects.requireNonNull(config);
this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
+ this.issueRepository = issueRepository;
+ this.instrumentationEnabled = instrumentationEnabled;
}
private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -76,8 +82,14 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
}
jobStatusConfig =
jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
+ Class observabilityEventProducerClassName =
Class.forName(ConfigUtils.getString(config,
+
GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
+
GaaSObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
+ GaaSObservabilityEventProducer observabilityEventProducer =
(GaaSObservabilityEventProducer)
GobblinConstructorUtils.invokeLongestConstructor(
+ observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
+
return (KafkaJobStatusMonitor) GobblinConstructorUtils
- .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler);
+ .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..f8b23bd62
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+/**
+ * The default producer for emitting GaaS Observability Events in the
KafkaJobStatusMonitor
+ * This class does no work and will not create or emit any events
+ */
+public class NoopGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
+
+ public NoopGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ super(state, issueRepository, instrumentationEnabled);
+ }
+
+ public NoopGaaSObservabilityEventProducer() {
+ super(null, null, false);
+ }
+
+ @Override
+ public void emitObservabilityEvent(State jobState) {}
+
+ @Override
+ protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event)
{}
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
new file mode 100644
index 000000000..18371e556
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+ private MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+
+ @Test
+ public void testCreateGaaSObservabilityEvent() throws Exception {
+ String flowGroup = "testFlowGroup1";
+ String flowName = "testFlowName1";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+ gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+ gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSObservabilityEventExperimental> emittedEvents =
producer.getTestEmittedEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event.getExecutorUrl(), "hostName");
+ Assert.assertEquals(event.getIssues().size(), 1);
+ }
+
+ private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {
+ return
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index dc17d0466..00b2598c8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -96,8 +96,7 @@ public abstract class JobStatusRetrieverTest {
properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME,
String.valueOf(endTime));
}
State jobStatus = new State(properties);
-
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
+ KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
}
static Properties createAttemptsProperties(int currGen, int currAttempts,
boolean shouldRetry) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
new file mode 100644
index 000000000..770276677
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+/**
+ * An extension of GaaSObservabilityEventProducer which creates the events and
stores them in a list
+ * Tests can use a getter to fetch a read-only version of the events that were
emitted
+ */
+public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
+ private List<GaaSObservabilityEventExperimental> emittedEvents = new
ArrayList<>();
+
+ public MockGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository) {
+ super(state, issueRepository, false);
+ }
+
+ @Override
+ protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event)
{
+ emittedEvents.add(event);
+ }
+
+ /**
+ * Returns the events that the mock producer has written
+ * This should only be used as a read-only object for emitted
GaaSObservabilityEvents
+ * @return list of events that would have been emitted
+ */
+ public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
+ return Collections.unmodifiableList(this.emittedEvents);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index d226c8322..f26ebf59e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -136,7 +136,7 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
State jobStatus = new State(properties);
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
+ KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
Iterator<JobStatus>
jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
@@ -158,7 +158,7 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
State jobStatus = new State(properties);
try {
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
+ KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
} catch (IOException e) {
Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data
too long"));
return;