This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 981ec4f310 [GOBBLIN-2079] Separate GaaSJobObservabilityEvents to both
flow and job level events (#3962)
981ec4f310 is described below
commit 981ec4f31027e69962984f1f63fb41a04f08dab8
Author: William Lo <[email protected]>
AuthorDate: Mon Jul 29 20:28:33 2024 -0400
[GOBBLIN-2079] Separate GaaSJobObservabilityEvents to both flow and job
level events (#3962)
* Adding flow level events to GaaSObservability to better differentiate vs
job level events
---
.../src/main/avro/GaaSFlowObservabilityEvent.avsc | 100 +++++++++++++++
.../runtime/KafkaAvroJobStatusMonitorTest.java | 140 ++++++++++++++++++++-
.../service/modules/orchestration/DagManager.java | 2 +
.../modules/orchestration/proc/DagProcUtils.java | 2 +
.../GaaSJobObservabilityEventProducer.java | 33 ++++-
.../NoopGaaSJobObservabilityEventProducer.java | 8 +-
.../GaaSJobObservabilityProducerTest.java | 53 +++++++-
.../MockGaaSJobObservabilityEventProducer.java | 32 +++--
8 files changed, 354 insertions(+), 16 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
new file mode 100644
index 0000000000..36a755bfbd
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc
@@ -0,0 +1,100 @@
+{
+ "type": "record",
+ "name": "GaaSFlowObservabilityEvent",
+ "namespace": "org.apache.gobblin.metrics",
+ "doc": "An event schema for GaaS to emit after a flow is executed.",
+ "fields": [
+ {
+ "name": "eventTimestamp",
+ "type": "long",
+ "doc": "Time at which event was created in milliseconds from Unix Epoch"
+ },
+ {
+ "name": "flowGroup",
+ "type": "string",
+ "doc": "Flow group for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowName",
+ "type": "string",
+ "doc": "Flow name for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowExecutionId",
+ "type": "long",
+ "doc": "Flow execution id for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "lastFlowModificationTimestamp",
+ "type": "long",
+ "doc": "Timestamp in millis since Epoch when the flow config was last
modified"
+ },
+ {
+ "name": "sourceNode",
+ "type": "string",
+ "doc": "Source node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "destinationNode",
+ "type": "string",
+ "doc": "Destination node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowStatus",
+ "type": {
+ "type": "enum",
+ "name": "FlowStatus",
+ "symbols": [
+ "SUCCEEDED",
+ "COMPILATION_FAILURE",
+ "SUBMISSION_FAILURE",
+ "EXECUTION_FAILURE",
+ "CANCELLED"
+ ],
+ "doc": "Final flow status for the GaaS flow",
+ "compliance": "NONE"
+ }
+ },
+ {
+ "name": "effectiveUserUrn",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "User URN (if applicable) whose identity was used to run the
underlying Gobblin job e.g. myGroup",
+ "compliance": "NONE"
+ },
+ {
+ "name": "gaasId",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null,
+ "doc": "The deployment ID of GaaS that is sending the event (if multiple
GaaS instances are running)"
+ },
+ {
+ "name": "flowStartTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the flow - when the dag is initialized, in millis
since Epoch. Null if the job was never run",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowEndTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Finish time of the job in millis since Epoch, null if the job
was never run",
+ "compliance": "NONE"
+ }
+ ]
+}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index e20cfd97fb..0017629487 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -59,6 +59,8 @@ import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.JobStatus;
@@ -653,7 +655,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
// Only the COMPLETE event should create a GaaSJobObservabilityEvent
- List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
GaaSJobObservabilityEvent event1 = iterator.next();
Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
@@ -702,7 +704,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
// Only the COMPLETE event should create a GaaSJobObservabilityEvent
- List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
Assert.assertEquals(emittedEvents.size(), 1);
Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
GaaSJobObservabilityEvent event1 = iterator.next();
@@ -713,6 +715,126 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+ public void testObservabilityEventFlowLevel() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobSucceededEvent(),
+ createFlowSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(producerState,
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer, mock(DagManagementStateStore.class));
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+ // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+ List<GaaSFlowObservabilityEvent> emittedFlowEvents =
mockEventProducer.getTestEmittedFlowEvents();
+ Assert.assertEquals(emittedFlowEvents.size(), 1);
+ Iterator<GaaSFlowObservabilityEvent> flowIterator =
emittedFlowEvents.iterator();
+ GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+ Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.SUCCEEDED);
+ Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+ Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+ jobStatusMonitor.shutDown();
+ }
+
+ @Test (dependsOnMethods = "testObservabilityEventFlowLevel")
+ public void testObservabilityEventFlowFailed() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobFailedEvent(),
+ createFlowFailedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(producerState,
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer, mock(DagManagementStateStore.class));
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
+
+ // Only the FAILED event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.EXECUTION_FAILURE);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+ // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+ List<GaaSFlowObservabilityEvent> emittedFlowEvents =
mockEventProducer.getTestEmittedFlowEvents();
+ Assert.assertEquals(emittedFlowEvents.size(), 1);
+ Iterator<GaaSFlowObservabilityEvent> flowIterator =
emittedFlowEvents.iterator();
+ GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+ Assert.assertEquals(flowEvent.getFlowStatus(),
FlowStatus.EXECUTION_FAILURE);
+ Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+ Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+ jobStatusMonitor.shutDown();
+ }
+
private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor
jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>>
recordIterator,
String jobGroup, String jobName) throws IOException {
jobStatusMonitor.processMessage(recordIterator.next());
@@ -761,7 +883,19 @@ public class KafkaAvroJobStatusMonitorTest {
}
private GobblinTrackingEvent createFlowSucceededEvent() {
- return createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED,
Maps.newHashMap());
+ GobblinTrackingEvent event =
createGTE(TimingEvent.FlowTimings.FLOW_SUCCEEDED, Maps.newHashMap());
+ // Remove jobname and jobgroup as flow level events should not have them,
get populated with "NA"
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ return event;
+ }
+
+ private GobblinTrackingEvent createFlowFailedEvent() {
+ GobblinTrackingEvent event =
createGTE(TimingEvent.FlowTimings.FLOW_FAILED, Maps.newHashMap());
+ // Remove jobname and jobgroup as flow level events should not have them,
get populated with "NA"
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ return event;
}
private GobblinTrackingEvent createJobFailedEvent() {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ee80cbb65b..e0992278b9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -77,6 +77,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -998,6 +999,7 @@ public class DagManager extends AbstractIdleService {
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
// Add serialized job properties as part of the orchestrated job event
metadata
jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
+ jobMetadata.put(SerializationConstants.FLOW_START_TIME_KEY,
String.valueOf(dagNode.getValue().getFlowStartTime()));
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 3e0f91c393..b63c15caea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -46,6 +46,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -125,6 +126,7 @@ public class DagProcUtils {
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
// Add serialized job properties as part of the orchestrated job event
metadata
jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
+ jobMetadata.put(SerializationConstants.FLOW_START_TIME_KEY,
String.valueOf(dagNode.getValue().getFlowStartTime()));
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index ab72a52f8a..4beafba29e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -38,6 +38,8 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.DatasetMetric;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.Issue;
import org.apache.gobblin.metrics.IssueSeverity;
@@ -57,6 +59,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
import org.apache.gobblin.util.PropertiesUtils;
@@ -69,6 +72,7 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSJobObservabilityEventProducer.";
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSJobObservabilityEventProducer.class.getName();
+ public static final String EMIT_FLOW_OBSERVABILITY_EVENT =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "emitFlowLevelEvent";
public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME =
"jobSucceeded";
@@ -81,12 +85,14 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
protected ObservableLongMeasurement jobStatusMetric;
protected MultiContextIssueRepository issueRepository;
protected boolean instrumentationEnabled;
+ protected boolean emitFlowObservabilityEvent;
ContextAwareMeter getIssuesFailedMeter;
public GaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
this.state = state;
this.issueRepository = issueRepository;
this.instrumentationEnabled = instrumentationEnabled;
+ this.emitFlowObservabilityEvent =
this.state.getPropAsBoolean(EMIT_FLOW_OBSERVABILITY_EVENT, false);
if (this.instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(state, getClass());
this.getIssuesFailedMeter =
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -123,7 +129,11 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
public void emitObservabilityEvent(final State jobState) {
GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
- sendUnderlyingEvent(event);
+ if
(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD).equals(JobStatusRetriever.NA_KEY)
&& this.emitFlowObservabilityEvent) {
+ sendFlowLevelEvent(convertJobEventToFlowEvent(event, jobState));
+ } else {
+ sendJobLevelEvent(event);
+ }
this.eventCollector.add(event);
}
@@ -142,7 +152,9 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
* Emits the GaaSJobObservabilityEvent with the mechanism that the child
class is built upon e.g. Kafka
* @param event
*/
- abstract protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event);
+ abstract protected void sendJobLevelEvent(GaaSJobObservabilityEvent event);
+
+ abstract protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event);
/**
* Creates a GaaSJobObservabilityEvent which is derived from a final GaaS
job pipeline state, which is combination of GTE job states in an ordered fashion
@@ -244,6 +256,23 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
)).collect(Collectors.toList());
}
+ private GaaSFlowObservabilityEvent
convertJobEventToFlowEvent(GaaSJobObservabilityEvent jobEvent, State jobState) {
+ GaaSFlowObservabilityEvent.Builder builder =
GaaSFlowObservabilityEvent.newBuilder();
+ builder.setEventTimestamp(jobEvent.getEventTimestamp())
+ .setGaasId(jobEvent.getGaasId())
+ .setFlowName(jobEvent.getFlowName())
+ .setFlowGroup(jobEvent.getFlowGroup())
+ .setFlowExecutionId(jobEvent.getFlowExecutionId())
+
.setLastFlowModificationTimestamp(jobEvent.getLastFlowModificationTimestamp())
+ .setSourceNode(jobEvent.getSourceNode())
+ .setDestinationNode(jobEvent.getDestinationNode())
+ .setEffectiveUserUrn(jobEvent.getEffectiveUserUrn())
+ .setFlowStatus(FlowStatus.valueOf(jobEvent.getJobStatus().toString()))
+
.setFlowStartTimestamp(jobState.getPropAsLong(SerializationConstants.FLOW_START_TIME_KEY,
0))
+ .setFlowEndTimestamp(jobEvent.getJobEndTimestamp());
+ return builder.build();
+ }
+
@Override
public void close() throws IOException {
// producer close will handle by the cache
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
index a0eeb88e17..de0db5a01d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.monitoring;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -33,12 +34,15 @@ public class NoopGaaSJobObservabilityEventProducer extends
GaaSJobObservabilityE
}
public NoopGaaSJobObservabilityEventProducer() {
- super(null, null, false);
+ super(new State(), null, false);
}
@Override
public void emitObservabilityEvent(State jobState) {}
@Override
- protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {}
+ protected void sendJobLevelEvent(GaaSJobObservabilityEvent event) {}
+
+ @Override
+ protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event) {}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 2aaf7ac261..2cbde03039 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -38,6 +38,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.FlowStatus;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.JobStatus;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -55,6 +57,7 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.SerializationConstants;
import org.apache.gobblin.util.PropertiesUtils;
@@ -107,7 +110,7 @@ public class GaaSJobObservabilityProducerTest {
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
- List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
Assert.assertEquals(emittedEvents.size(), 1);
Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
@@ -171,7 +174,7 @@ public class GaaSJobObservabilityProducerTest {
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
- List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
Assert.assertEquals(emittedEvents.size(), 1);
Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
@@ -195,6 +198,52 @@ public class GaaSJobObservabilityProducerTest {
serializer.serializeRecord(event);
}
+ @Test
+ public void testCreateGaaSObservabilityFlowEvent() throws Exception {
+ String flowGroup = "testFlowGroup3";
+ String flowName = "testFlowName3";
+ String jobName = JobStatusRetriever.NA_KEY;
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ State producerState = new State();
+
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
+ MockGaaSJobObservabilityEventProducer
+ producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, false);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSFlowObservabilityEvent> emittedEvents =
producer.getTestEmittedFlowEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSFlowObservabilityEvent event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
+ Assert.assertNull(event.getEffectiveUserUrn());
+ Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
+
+ AvroSerializer<GaaSFlowObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSFlowObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
+ );
+ serializer.serializeRecord(event);
+ }
+
@Test
public void testEnableMetrics() throws Exception {
String flowGroup = "testFlowGroup2";
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
index 528edc2735..5d251fce08 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
@@ -33,7 +34,9 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
* Tests can use a getter to fetch a read-only version of the events that were
emitted
*/
public class MockGaaSJobObservabilityEventProducer extends
GaaSJobObservabilityEventProducer {
- private List<GaaSJobObservabilityEvent> emittedEvents = new ArrayList<>();
+ private List<GaaSJobObservabilityEvent> emittedJobEvents = new ArrayList<>();
+
+ private List<GaaSFlowObservabilityEvent> emittedFlowEvents = new
ArrayList<>();
public MockGaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
super(state, issueRepository, instrumentationEnabled);
@@ -44,19 +47,34 @@ public class MockGaaSJobObservabilityEventProducer extends
GaaSJobObservabilityE
return InMemoryOpenTelemetryMetrics.getInstance(state);
}
@Override
- protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {
- emittedEvents.add(event);
+ protected void sendJobLevelEvent(GaaSJobObservabilityEvent event) {
+ emittedJobEvents.add(event);
+ }
+
+ @Override
+ protected void sendFlowLevelEvent(GaaSFlowObservabilityEvent event) {
+ emittedFlowEvents.add(event);
+ }
+
+ /**
+ * Returns the job level events that the mock producer has written
+ * This should only be used as a read-only object for emitted
GaaSJobObservabilityEvents
+ * @return list of events that would have been emitted
+ */
+ public List<GaaSJobObservabilityEvent> getTestEmittedJobEvents() {
+ return Collections.unmodifiableList(this.emittedJobEvents);
}
/**
- * Returns the events that the mock producer has written
- * This should only be used as a read-only object for emitted
GaaSObservabilityEvents
+ * Returns the flow level events that the mock producer has written
+ * This should only be used as a read-only object for emitted
GaaSFlowObservabilityEvents
* @return list of events that would have been emitted
*/
- public List<GaaSJobObservabilityEvent> getTestEmittedEvents() {
- return Collections.unmodifiableList(this.emittedEvents);
+ public List<GaaSFlowObservabilityEvent> getTestEmittedFlowEvents() {
+ return Collections.unmodifiableList(this.emittedFlowEvents);
}
+
public InMemoryOpenTelemetryMetrics getOpentelemetryMetrics() {
return (InMemoryOpenTelemetryMetrics) this.opentelemetryMetrics;
}