This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9640ef0d6 [GOBBLIN-1816] Add job properties and GaaS instance ID to
observability event (#3676)
9640ef0d6 is described below
commit 9640ef0d6e84b39841535d94b58f4998de97b58b
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 20 13:17:08 2023 -0400
[GOBBLIN-1816] Add job properties and GaaS instance ID to observability
event (#3676)
* Add job properties and GaaS instance ID to observability event
* Fix conflicts
---
.../org/apache/gobblin/service/ServiceConfigKeys.java | 2 +-
.../main/avro/GaaSObservabilityEventExperimental.avsc | 16 ++++++++++++++++
.../service/modules/orchestration/DagManager.java | 3 ++-
.../gobblin/service/modules/spec/JobExecutionPlan.java | 1 +
.../monitoring/GaaSObservabilityEventProducer.java | 6 +++++-
.../monitoring/GaaSObservabilityProducerTest.java | 10 ++++++++--
6 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index d93eefbe2..ef4323538 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -32,7 +32,7 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
-
+ public static final String GOBBLIN_SERVICE_INSTANCE_NAME =
GOBBLIN_SERVICE_PREFIX + "instance.name";
public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 596922c0e..605e961db 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -130,6 +130,22 @@
"doc": "The ID of the spec executor that ran or would have ran the job",
"compliance": "NONE"
},
+ {
+ "name": "gaasId",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "The instance of GaaS that is sending the event (if multiple GaaS
instances are running)"
+ },
+ {
+ "name":"jobProperties",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "The job properties GaaS sends to the job executor. This is a
JSON string of the job properties"
+ },
{
"name": "issues",
"type": [
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6251ac1d2..f47cbde50 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1029,7 +1029,8 @@ public class DagManager extends AbstractIdleService {
addSpecFuture.get();
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
-
+ // Add serialized job properties as part of the orchestrated job event
metadata
+ jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
dagNode.getValue().toString());
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index beccb1a13..8a6f5e1d4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture", "flowStartTime"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+ public static final String JOB_PROPS_KEY = "job.props";
private static final int MAX_JOB_NAME_LENGTH = 255;
private final JobSpec jobSpec;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 2b55e27f0..2b81378f6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -46,7 +46,9 @@ import
org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
/**
@@ -134,7 +136,9 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
.setIssues(issueList)
.setJobStatus(status)
.setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
- .setDatasetsWritten(datasetMetrics);
+ .setDatasetsWritten(datasetMetrics)
+
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
null))
+ .setJobProperties(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY,
null));
return builder.build();
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index f60561992..96383e6e4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -44,7 +44,9 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
public class GaaSObservabilityProducerTest {
@@ -67,7 +69,9 @@ public class GaaSObservabilityProducerTest {
summaries.add(dataset1);
summaries.add(dataset2);
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+ State state = new State();
+ state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
"testCluster");
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(state, this.issueRepository);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
@@ -84,6 +88,7 @@ public class GaaSObservabilityProducerTest {
gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
"20");
gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
Properties jobStatusProps = new Properties();
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
@@ -116,7 +121,8 @@ public class GaaSObservabilityProducerTest {
Assert.assertEquals(event.getDatasetsWritten().get(1).getEntitiesWritten(),
Long.valueOf(dataset2.getRecordsWritten()));
Assert.assertEquals(event.getDatasetsWritten().get(1).getBytesWritten(),
Long.valueOf(dataset2.getBytesWritten()));
Assert.assertEquals(event.getDatasetsWritten().get(1).getSuccessfullyCommitted(),
Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
-
+ Assert.assertEquals(event.getJobProperties(),
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+ Assert.assertEquals(event.getGaasId(), "testCluster");
AvroSerializer<GaaSObservabilityEventExperimental> serializer = new
AvroBinarySerializer<>(
GaaSObservabilityEventExperimental.SCHEMA$, new
NoopSchemaVersionWriter()
);