This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7a11e72ac Graduate GaaSObservabilityEventExperimental (#3940)
7a11e72ac is described below
commit 7a11e72ac4a25325f216b0badc693e8a1677282d
Author: William Lo <[email protected]>
AuthorDate: Tue May 7 14:20:43 2024 -0400
Graduate GaaSObservabilityEventExperimental (#3940)
Update schema and rename event
---
...imental.avsc => GaaSJobObservabilityEvent.avsc} | 62 ++++++++++-----
.../avro/GaaSObservabilityEventExperimental.avsc | 4 +-
.../runtime/KafkaAvroJobStatusMonitorTest.java | 50 ++++++------
.../modules/flowgraph/BaseFlowGraphHelper.java | 2 +-
.../service/modules/orchestration/DagManager.java | 3 +-
...java => GaaSJobObservabilityEventProducer.java} | 91 ++++++++++++++--------
.../monitoring/KafkaAvroJobStatusMonitor.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 4 +-
.../monitoring/KafkaJobStatusMonitorFactory.java | 6 +-
... => NoopGaaSJobObservabilityEventProducer.java} | 10 +--
....java => GaaSJobObservabilityProducerTest.java} | 87 ++++++++++++---------
... => MockGaaSJobObservabilityEventProducer.java} | 14 ++--
12 files changed, 196 insertions(+), 139 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
similarity index 78%
copy from
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
copy to
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
index 887d322f7..1f3bf0a09 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc
@@ -1,13 +1,13 @@
{
"type": "record",
- "name": "GaaSObservabilityEventExperimental",
+ "name": "GaaSJobObservabilityEvent",
"namespace": "org.apache.gobblin.metrics",
- "doc": "An experimental format for GaaS to emit events during and after a
job is executed.",
+ "doc": "An event schema for GaaS to emit during and after a job is
executed.",
"fields": [
{
- "name": "timestamp",
+ "name": "eventTimestamp",
"type": "long",
- "doc": "Time at which event was created in millis"
+ "doc": "Time at which event was created in milliseconds from Unix Epoch"
},
{
"name": "flowGroup",
@@ -28,14 +28,26 @@
"compliance": "NONE"
},
{
- "name": "lastFlowModificationTime",
+ "name": "lastFlowModificationTimestamp",
"type": "long",
"doc": "Timestamp in millis since Epoch when the flow config was last
modified"
},
{
- "name": "flowGraphEdgeId",
+ "name": "sourceNode",
"type": "string",
- "doc": "Flow edge id, in format <src_node>_<dest_node>_<edge_id>",
+ "doc": "Source node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "destinationNode",
+ "type": "string",
+ "doc": "Destination node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowEdgeId",
+ "type": "string",
+ "doc": "Flow edge id, excluding the sourceNode and destinationNode",
"compliance": "NONE"
},
{
@@ -61,7 +73,7 @@
}
},
{
- "name": "jobOrchestratedTime",
+ "name": "jobOrchestratedTimestamp",
"type": [
"null",
"long"
@@ -69,7 +81,7 @@
"doc": "Timestamp when the job was successfully sent to the job
executor, null if it was unable to be sent."
},
{
- "name": "jobStartTime",
+ "name": "jobStartTimestamp",
"type": [
"null",
"long"
@@ -78,7 +90,7 @@
"compliance": "NONE"
},
{
- "name": "jobEndTime",
+ "name": "jobEndTimestamp",
"type": [
"null",
"long"
@@ -87,7 +99,7 @@
"compliance": "NONE"
},
{
- "name": "jobPlanningPhaseStartTime",
+ "name": "jobPlanningStartTimestamp",
"type": [
"null",
"long"
@@ -97,7 +109,7 @@
"default": null
},
{
- "name": "jobPlanningPhaseEndTime",
+ "name": "jobPlanningEndTimestamp",
"type": [
"null",
"long"
@@ -107,12 +119,12 @@
"default": null
},
{
- "name": "executionUserUrn",
+ "name": "effectiveUserUrn",
"type": [
"null",
"string"
],
- "doc": "User URN (if applicable) that runs the underlying Gobblin job",
+ "doc": "User URN (if applicable) whose identity was used to run the
underlying Gobblin job e.g. myGroup",
"compliance": "NONE"
},
{
@@ -127,7 +139,16 @@
{
"name": "executorId",
"type": "string",
- "doc": "The ID of the spec executor that ran or would have ran the job",
+ "doc": "The ID of the spec executor that ran or would have run the job",
+ "compliance": "NONE"
+ },
+ {
+ "name": "executorUrn",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "Unique URN of the execution across platforms that describes the
relevant ids for each job on each platform e.g.
urn:apache:gobblin:exec:azkaban:92937153:temporal:NestingExecWorkUnits_1714593600113",
"compliance": "NONE"
},
{
@@ -137,7 +158,7 @@
"string"
],
"default": null,
- "doc": "The instance of GaaS that is sending the event (if multiple GaaS
instances are running)"
+ "doc": "The deployment ID of GaaS that is sending the event (if multiple
GaaS instances are running)"
},
{
"name":"jobProperties",
@@ -162,7 +183,7 @@
{
"name": "timestamp",
"type": "long",
- "doc": "Time when the issue occurred"
+ "doc": "Milliseconds from UNIX epoch when the issue occurred"
},
{
"name": "severity",
@@ -208,7 +229,7 @@
]
},
{
- "name": "datasetsWritten",
+ "name": "datasetsMetrics",
"type": [
"null",
{
@@ -221,7 +242,7 @@
{
"name": "datasetUrn",
"type": "string",
- "doc": "URN of the dataset"
+ "doc": "URN of the dataset, empty string for unnamed datasets"
},
{
"name": "bytesWritten",
@@ -236,7 +257,7 @@
{
"name": "successfullyCommitted",
"type": "boolean",
- "doc": "Whether the dataset was successfully committed by
Gobblin and fully successful, useful when users configure pipelines to allow
for partial failures or non-atomic writes"
+ "doc": "Whether the dataset was successfully committed by
Gobblin and fully successful, versus when users configure partial failures or
non-atomic writes"
}
]
}
@@ -247,3 +268,4 @@
]
}
+
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 887d322f7..127d90694 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -2,7 +2,7 @@
"type": "record",
"name": "GaaSObservabilityEventExperimental",
"namespace": "org.apache.gobblin.metrics",
- "doc": "An experimental format for GaaS to emit events during and after a
job is executed.",
+ "doc": "The deprecated format for GaaS to emit events during and after a job
is executed.",
"fields": [
{
"name": "timestamp",
@@ -127,7 +127,7 @@
{
"name": "executorId",
"type": "string",
- "doc": "The ID of the spec executor that ran or would have ran the job",
+ "doc": "The ID of the spec executor that ran or would have run the job",
"compliance": "NONE"
},
{
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 94980017c..68e208e9c 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -58,7 +58,7 @@ import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.JobStatus;
import org.apache.gobblin.metrics.MetricContext;
@@ -72,12 +72,12 @@ import
org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
-import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
+import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import
org.apache.gobblin.service.monitoring.MockGaaSObservabilityEventProducer;
-import
org.apache.gobblin.service.monitoring.NoopGaaSObservabilityEventProducer;
+import
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer;
+import
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer;
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
@@ -143,7 +143,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSObservabilityEventProducer());
+ new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -201,7 +201,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -279,7 +279,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSObservabilityEventProducer());
+ new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -339,7 +339,7 @@ public class KafkaAvroJobStatusMonitorTest {
Config conf = ConfigFactory.empty().withValue(
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." +
RETRY_MULTIPLIER,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
conf,
- new NoopGaaSObservabilityEventProducer());
+ new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
@@ -397,7 +397,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -456,7 +456,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -507,7 +507,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSObservabilityEventProducer());
+ new
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
@@ -537,7 +537,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
- MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(
ConfigUtils.configToState(ConfigFactory.empty()), issueRepository,
false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
mockEventProducer);
@@ -553,15 +553,15 @@ public class KafkaAvroJobStatusMonitorTest {
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
- // Only the COMPLETE event should create a GaaSObservabilityEvent
- List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
- Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
- GaaSObservabilityEventExperimental event1 = iterator.next();
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
Assert.assertEquals(event1.getFlowName(), this.flowName);
Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
- Assert.assertEquals(event1.getJobPlanningPhaseStartTime(),
Long.valueOf(2));
- Assert.assertEquals(event1.getJobPlanningPhaseEndTime(), Long.valueOf(3));
+ Assert.assertEquals(event1.getJobPlanningStartTimestamp(),
Long.valueOf(2));
+ Assert.assertEquals(event1.getJobPlanningEndTimestamp(), Long.valueOf(3));
jobStatusMonitor.shutDown();
}
@@ -584,7 +584,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
- MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer
mockEventProducer = new
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer(
ConfigUtils.configToState(ConfigFactory.empty()), issueRepository,
false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
mockEventProducer);
@@ -602,11 +602,11 @@ public class KafkaAvroJobStatusMonitorTest {
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
- // Only the COMPLETE event should create a GaaSObservabilityEvent
- List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedEvents();
Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
- GaaSObservabilityEventExperimental event1 = iterator.next();
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
Assert.assertEquals(event1.getJobStatus(), JobStatus.CANCELLED);
Assert.assertEquals(event1.getFlowName(), this.flowName);
Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
@@ -714,7 +714,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
- GaaSObservabilityEventProducer eventProducer) throws IOException,
ReflectiveOperationException {
+ GaaSJobObservabilityEventProducer eventProducer) throws IOException,
ReflectiveOperationException {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
@@ -776,7 +776,7 @@ public class KafkaAvroJobStatusMonitorTest {
* @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and
retain) to dial whether `parseJobStatus` throws
*/
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
- AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSObservabilityEventProducer producer)
+ AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSJobObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, mysqlDagActionStore);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 3d33b2f1c..0f435bf53 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -72,7 +72,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
public class BaseFlowGraphHelper {
private static final int NODE_FILE_DEPTH = 3;
private static final int EDGE_FILE_DEPTH = 4;
- private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+ public static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
final String baseDirectory;
private final Config emptyConfig = ConfigFactory.empty();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 218fe8128..ce942cd7a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -83,6 +83,7 @@ import
org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import static org.apache.gobblin.service.ExecutionStatus.*;
@@ -1003,7 +1004,7 @@ public class DagManager extends AbstractIdleService {
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
// Add serialized job properties as part of the orchestrated job event
metadata
- jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
dagNode.getValue().toString());
+ jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobSpec.getConfigAsProperties()));
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
similarity index 70%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index 0966e458d..bd5c4eb3d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -22,8 +22,11 @@ import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
import com.codahale.metrics.MetricRegistry;
import com.google.gson.reflect.TypeToken;
@@ -35,7 +38,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.DatasetMetric;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.Issue;
import org.apache.gobblin.metrics.IssueSeverity;
import org.apache.gobblin.metrics.JobStatus;
@@ -51,34 +54,36 @@ import
org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.PropertiesUtils;
/**
* A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
- * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ * This is an abstract class, we need a sub system like Kafka, which support
at least once delivery, to emit the event
*/
@Slf4j
-public abstract class GaaSObservabilityEventProducer implements Closeable {
- public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
- public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
- public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
- public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
- public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
+public abstract class GaaSJobObservabilityEventProducer implements Closeable {
+ public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSJobObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSJobObservabilityEventProducer.class.getName();
+ public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+ public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME =
"jobSucceeded";
protected MetricContext metricContext;
protected State state;
- List<GaaSObservabilityEventExperimental> eventCollector = new ArrayList<>();
+ List<GaaSJobObservabilityEvent> eventCollector = new ArrayList<>();
protected OpenTelemetryMetricsBase opentelemetryMetrics;
protected ObservableLongMeasurement jobStatusMetric;
protected MultiContextIssueRepository issueRepository;
protected boolean instrumentationEnabled;
ContextAwareMeter getIssuesFailedMeter;
- public GaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ public GaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
this.state = state;
this.issueRepository = issueRepository;
this.instrumentationEnabled = instrumentationEnabled;
@@ -104,7 +109,7 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
.buildObserver();
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.batchCallback(() -> {
- for (GaaSObservabilityEventExperimental event :
this.eventCollector) {
+ for (GaaSJobObservabilityEvent event : this.eventCollector) {
Attributes tags = getEventAttributes(event);
int status = event.getJobStatus() == JobStatus.SUCCEEDED ? 1 : 0;
this.jobStatusMetric.record(status, tags);
@@ -117,77 +122,95 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
}
public void emitObservabilityEvent(final State jobState) {
- GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
+ GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
sendUnderlyingEvent(event);
this.eventCollector.add(event);
}
- public Attributes getEventAttributes(GaaSObservabilityEventExperimental
event) {
+ public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
event.getFlowName())
.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
event.getFlowGroup())
.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId())
- .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId())
+ .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowEdgeId())
.build();
return tags;
}
/**
- * Emits the GaaSObservabilityEvent with the mechanism that the child class
is built upon e.g. Kafka
+ * Emits the GaaSJobObservabilityEvent with the mechanism that the child
class is built upon e.g. Kafka
* @param event
*/
- abstract protected void
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+ abstract protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event);
/**
- * Creates a GaaSObservabilityEvent which is derived from a final GaaS job
pipeline state, which is combination of GTE job states in an ordered fashion
+ * Creates a GaaSJobObservabilityEvent which is derived from a final GaaS
job pipeline state, which is combination of GTE job states in an ordered fashion
* @param jobState
- * @return GaaSObservabilityEvent
+ * @return GaaSJobObservabilityEvent
*/
- private GaaSObservabilityEventExperimental
createGaaSObservabilityEvent(final State jobState) {
+ private GaaSJobObservabilityEvent createGaaSObservabilityEvent(final State
jobState) {
Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
Long jobOrchestratedTime =
jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
Long jobPlanningPhaseStartTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
Long jobPlanningPhaseEndTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+ String flowGroup =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowName =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ Properties jobProperties = new Properties();
+ try {
+ jobProperties =
PropertiesUtils.deserialize(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY,
""));
+ } catch (IOException e) {
+ log.error("Could not deserialize job properties for flowGroup {}
flowName {} while creating GaaSJobObservabilityEvent due to ", flowGroup,
flowName, e);
+ }
+
+ String fullFlowEdge =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "");
+ // Parse the flow edge from edge id that is stored in format
sourceNode_destinationNode_flowEdgeId
+ String edgeId = StringUtils.substringAfter(
+ StringUtils.substringAfter(fullFlowEdge,
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
"")),
+ BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR);
+
Type datasetTaskSummaryType = new
TypeToken<ArrayList<DatasetTaskSummary>>(){}.getType();
List<DatasetTaskSummary> datasetTaskSummaries =
jobState.contains(TimingEvent.DATASET_TASK_SUMMARIES) ?
GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(jobState.getProp(TimingEvent.DATASET_TASK_SUMMARIES),
datasetTaskSummaryType) : null;
List<DatasetMetric> datasetMetrics = datasetTaskSummaries != null ?
datasetTaskSummaries.stream().map(
DatasetTaskSummary::toDatasetMetric).collect(Collectors.toList()) :
null;
- GaaSObservabilityEventExperimental.Builder builder =
GaaSObservabilityEventExperimental.newBuilder();
+ GaaSJobObservabilityEvent.Builder builder =
GaaSJobObservabilityEvent.newBuilder();
List<Issue> issueList = null;
try {
issueList = getIssuesForJob(issueRepository, jobState);
} catch (Exception e) {
// If issues cannot be fetched, increment metric but continue to try to
emit the event
- log.error("Could not fetch issues while creating GaaSObservabilityEvent
due to ", e);
+ log.error("Could not fetch issues while creating
GaaSJobObservabilityEvent due to ", e);
if (this.instrumentationEnabled) {
this.getIssuesFailedMeter.mark();
}
}
JobStatus status = convertExecutionStatusTojobState(jobState,
ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
- builder.setTimestamp(System.currentTimeMillis())
-
.setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
-
.setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
-
.setFlowGraphEdgeId(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
""))
+ builder.setEventTimestamp(System.currentTimeMillis())
+ .setFlowName(flowName)
+ .setFlowGroup(flowGroup)
.setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
-
.setLastFlowModificationTime(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
0))
+
.setLastFlowModificationTimestamp(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
0))
.setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
.setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
.setExecutorId(jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
""))
- .setJobStartTime(jobStartTime)
- .setJobEndTime(jobEndTime)
- .setJobOrchestratedTime(jobOrchestratedTime)
- .setJobPlanningPhaseStartTime(jobPlanningPhaseStartTime)
- .setJobPlanningPhaseEndTime(jobPlanningPhaseEndTime)
+ .setJobStartTimestamp(jobStartTime)
+ .setJobEndTimestamp(jobEndTime)
+ .setJobOrchestratedTimestamp(jobOrchestratedTime)
+ .setJobPlanningStartTimestamp(jobPlanningPhaseStartTime)
+ .setJobPlanningEndTimestamp(jobPlanningPhaseEndTime)
.setIssues(issueList)
.setJobStatus(status)
-
.setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
- .setDatasetsWritten(datasetMetrics)
+
.setEffectiveUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
+ .setDatasetsMetrics(datasetMetrics)
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
null))
- .setJobProperties(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY,
null));
+
.setJobProperties(GsonUtils.GSON_WITH_DATE_HANDLING.newBuilder().create().toJson(jobProperties))
+
.setSourceNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY,
""))
+
.setDestinationNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
""))
+ .setFlowEdgeId(!edgeId.isEmpty() ? edgeId : fullFlowEdge)
+ .setExecutorUrn(null); //TODO: Fill with information from job execution
return builder.build();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 4aced6afb..eb405ba60 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -66,7 +66,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
private Meter messageParseFailures;
public KafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads, JobIssueEventHandler jobIssueEventHandler,
- GaaSObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagActionStore);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index ab6d62f93..c72502340 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -115,12 +115,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final JobIssueEventHandler jobIssueEventHandler;
private final Retryer<Void> persistJobStatusRetryer;
- private final GaaSObservabilityEventProducer eventProducer;
+ private final GaaSJobObservabilityEventProducer eventProducer;
private final DagActionStore dagActionStore;
private final boolean dagProcEngineEnabled;
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
- GaaSObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY,
FileContextBasedFsStateStoreFactory.class.getName());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index ecc759333..a1698e57a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -97,9 +97,9 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
}
jobStatusConfig =
jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
Class observabilityEventProducerClassName =
Class.forName(ConfigUtils.getString(config,
-
GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
-
GaaSObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
- GaaSObservabilityEventProducer observabilityEventProducer =
(GaaSObservabilityEventProducer)
GobblinConstructorUtils.invokeLongestConstructor(
+
GaaSJobObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY,
+
GaaSJobObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
+ GaaSJobObservabilityEventProducer observabilityEventProducer =
(GaaSJobObservabilityEventProducer)
GobblinConstructorUtils.invokeLongestConstructor(
observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
return (KafkaJobStatusMonitor) GobblinConstructorUtils
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
similarity index 75%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
index f8b23bd62..a0eeb88e1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSJobObservabilityEventProducer.java
@@ -18,7 +18,7 @@
package org.apache.gobblin.service.monitoring;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -26,13 +26,13 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
* The default producer for emitting GaaS Observability Events in the
KafkaJobStatusMonitor
* This class does no work and will not create or emit any events
*/
-public class NoopGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
+public class NoopGaaSJobObservabilityEventProducer extends
GaaSJobObservabilityEventProducer {
- public NoopGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ public NoopGaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
super(state, issueRepository, instrumentationEnabled);
}
- public NoopGaaSObservabilityEventProducer() {
+ public NoopGaaSJobObservabilityEventProducer() {
super(null, null, false);
}
@@ -40,5 +40,5 @@ public class NoopGaaSObservabilityEventProducer extends
GaaSObservabilityEventPr
public void emitObservabilityEvent(State jobState) {}
@Override
- protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event)
{}
+ protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
similarity index 80%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index d2fdbd7b7..1f3da5205 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -37,7 +37,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.JobStatus;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
@@ -54,9 +54,10 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.PropertiesUtils;
-public class GaaSObservabilityProducerTest {
+public class GaaSJobObservabilityProducerTest {
private MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
@@ -75,17 +76,22 @@ public class GaaSObservabilityProducerTest {
DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000,
10000, false);
summaries.add(dataset1);
summaries.add(dataset2);
+ Properties jobProps = new Properties();
+ jobProps.setProperty("flow.executionId", "1681242538558");
+ jobProps.setProperty("user.to.proxy", "newUser");
+ jobProps.setProperty("gobblin.flow.sourceIdentifier", "sourceNode");
+ jobProps.setProperty("gobblin.flow.destinationIdentifier",
"destinationNode");
State state = new State();
state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
"testCluster");
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(state, this.issueRepository, false);
+ MockGaaSJobObservabilityEventProducer producer = new
MockGaaSJobObservabilityEventProducer(state, this.issueRepository, false);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
flowExecutionId);
gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"sourceNode_destinationNode_flowEdge");
gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
@@ -95,16 +101,16 @@ public class GaaSObservabilityProducerTest {
gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
"20");
gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
- gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
Properties jobStatusProps = new Properties();
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
- List<GaaSObservabilityEventExperimental> emittedEvents =
producer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedEvents();
Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
- GaaSObservabilityEventExperimental event = iterator.next();
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event = iterator.next();
Assert.assertEquals(event.getFlowGroup(), flowGroup);
Assert.assertEquals(event.getFlowName(), flowName);
Assert.assertEquals(event.getJobName(), jobName);
@@ -112,26 +118,28 @@ public class GaaSObservabilityProducerTest {
Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
Assert.assertEquals(event.getExecutorUrl(), "hostName");
Assert.assertEquals(event.getIssues().size(), 1);
- Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getSourceNode(), "sourceNode");
+ Assert.assertEquals(event.getDestinationNode(), "destinationNode");
Assert.assertEquals(event.getExecutorId(), "specExecutor");
- Assert.assertEquals(event.getExecutionUserUrn(), "azkabanUser");
- Assert.assertEquals(event.getJobOrchestratedTime(), Long.valueOf(1));
- Assert.assertEquals(event.getLastFlowModificationTime(), Long.valueOf(20));
- Assert.assertEquals(event.getJobStartTime(), Long.valueOf(20));
- Assert.assertEquals(event.getJobEndTime(), Long.valueOf(100));
- Assert.assertEquals(event.getDatasetsWritten().size(), 2);
- Assert.assertEquals(event.getDatasetsWritten().get(0).getDatasetUrn(),
dataset1.getDatasetUrn());
-
Assert.assertEquals(event.getDatasetsWritten().get(0).getEntitiesWritten(),
Long.valueOf(dataset1.getRecordsWritten()));
- Assert.assertEquals(event.getDatasetsWritten().get(0).getBytesWritten(),
Long.valueOf(dataset1.getBytesWritten()));
-
Assert.assertEquals(event.getDatasetsWritten().get(0).getSuccessfullyCommitted(),
Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
- Assert.assertEquals(event.getDatasetsWritten().get(1).getDatasetUrn(),
dataset2.getDatasetUrn());
-
Assert.assertEquals(event.getDatasetsWritten().get(1).getEntitiesWritten(),
Long.valueOf(dataset2.getRecordsWritten()));
- Assert.assertEquals(event.getDatasetsWritten().get(1).getBytesWritten(),
Long.valueOf(dataset2.getBytesWritten()));
-
Assert.assertEquals(event.getDatasetsWritten().get(1).getSuccessfullyCommitted(),
Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
- Assert.assertEquals(event.getJobProperties(),
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+ Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
+ Assert.assertEquals(event.getJobOrchestratedTimestamp(), Long.valueOf(1));
+ Assert.assertEquals(event.getLastFlowModificationTimestamp(),
Long.valueOf(20));
+ Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
+ Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
+ Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
+ Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(),
dataset1.getDatasetUrn());
+
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(),
Long.valueOf(dataset1.getRecordsWritten()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(),
Long.valueOf(dataset1.getBytesWritten()));
+
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(),
dataset2.getDatasetUrn());
+
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(),
Long.valueOf(dataset2.getRecordsWritten()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(),
Long.valueOf(dataset2.getBytesWritten()));
+
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
+ Assert.assertEquals(event.getJobProperties(),
"{\"gobblin.flow.sourceIdentifier\":\"sourceNode\",\"gobblin.flow.destinationIdentifier\":\"destinationNode\",\"user.to.proxy\":\"newUser\",\"flow.executionId\":\"1681242538558\"}");
Assert.assertEquals(event.getGaasId(), "testCluster");
- AvroSerializer<GaaSObservabilityEventExperimental> serializer = new
AvroBinarySerializer<>(
- GaaSObservabilityEventExperimental.SCHEMA$, new
NoopSchemaVersionWriter()
+ AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
);
serializer.serializeRecord(event);
}
@@ -146,7 +154,8 @@ public class GaaSObservabilityProducerTest {
TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(new State(), this.issueRepository, false);
+ MockGaaSJobObservabilityEventProducer
+ producer = new MockGaaSJobObservabilityEventProducer(new State(),
this.issueRepository, false);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
@@ -161,26 +170,26 @@ public class GaaSObservabilityProducerTest {
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
- List<GaaSObservabilityEventExperimental> emittedEvents =
producer.getTestEmittedEvents();
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedEvents();
Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
- GaaSObservabilityEventExperimental event = iterator.next();
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event = iterator.next();
Assert.assertEquals(event.getFlowGroup(), flowGroup);
Assert.assertEquals(event.getFlowName(), flowName);
Assert.assertEquals(event.getJobName(), jobName);
Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
Assert.assertEquals(event.getIssues().size(), 1);
- Assert.assertEquals(event.getFlowGraphEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
Assert.assertEquals(event.getExecutorId(), "specExecutor");
- Assert.assertEquals(event.getJobOrchestratedTime(), null);
- Assert.assertEquals(event.getJobStartTime(), null);
- Assert.assertEquals(event.getExecutionUserUrn(), null);
+ Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
+ Assert.assertEquals(event.getJobStartTimestamp(), null);
+ Assert.assertEquals(event.getEffectiveUserUrn(), null);
Assert.assertEquals(event.getExecutorUrl(), null);
- AvroSerializer<GaaSObservabilityEventExperimental> serializer = new
AvroBinarySerializer<>(
- GaaSObservabilityEventExperimental.SCHEMA$, new
NoopSchemaVersionWriter()
+ AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
);
serializer.serializeRecord(event);
}
@@ -199,7 +208,8 @@ public class GaaSObservabilityProducerTest {
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
"http://localhost:5000");
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+ MockGaaSJobObservabilityEventProducer
+ producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
@@ -231,7 +241,8 @@ public class GaaSObservabilityProducerTest {
State producerState = new State();
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+ MockGaaSJobObservabilityEventProducer
+ producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
similarity index 75%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
index eb9269535..528edc273 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSJobObservabilityEventProducer.java
@@ -22,20 +22,20 @@ import java.util.Collections;
import java.util.List;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.GaaSJobObservabilityEvent;
import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
/**
- * An extension of GaaSObservabilityEventProducer which creates the events and
stores them in a list
+ * An extension of GaaSJobObservabilityEventProducer which creates the events
and stores them in a list
* Tests can use a getter to fetch a read-only version of the events that were
emitted
*/
-public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
- private List<GaaSObservabilityEventExperimental> emittedEvents = new
ArrayList<>();
+public class MockGaaSJobObservabilityEventProducer extends
GaaSJobObservabilityEventProducer {
+ private List<GaaSJobObservabilityEvent> emittedEvents = new ArrayList<>();
- public MockGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ public MockGaaSJobObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
super(state, issueRepository, instrumentationEnabled);
}
@@ -44,7 +44,7 @@ public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventPr
return InMemoryOpenTelemetryMetrics.getInstance(state);
}
@Override
- protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event)
{
+ protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event) {
emittedEvents.add(event);
}
@@ -53,7 +53,7 @@ public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventPr
* This should only be used as a read-only object for emitted
GaaSObservabilityEvents
* @return list of events that would have been emitted
*/
- public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
+ public List<GaaSJobObservabilityEvent> getTestEmittedEvents() {
return Collections.unmodifiableList(this.emittedEvents);
}