This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9640ef0d6 [GOBBLIN-1816] Add job properties and GaaS instance ID to 
observability event (#3676)
9640ef0d6 is described below

commit 9640ef0d6e84b39841535d94b58f4998de97b58b
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 20 13:17:08 2023 -0400

    [GOBBLIN-1816] Add job properties and GaaS instance ID to observability 
event (#3676)
    
    * Add job properties and GaaS instance ID to observability event
    
    * Fix conflicts
---
 .../org/apache/gobblin/service/ServiceConfigKeys.java    |  2 +-
 .../main/avro/GaaSObservabilityEventExperimental.avsc    | 16 ++++++++++++++++
 .../service/modules/orchestration/DagManager.java        |  3 ++-
 .../gobblin/service/modules/spec/JobExecutionPlan.java   |  1 +
 .../monitoring/GaaSObservabilityEventProducer.java       |  6 +++++-
 .../monitoring/GaaSObservabilityProducerTest.java        | 10 ++++++++--
 6 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index d93eefbe2..ef4323538 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -32,7 +32,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
   public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
-
+  public static final String GOBBLIN_SERVICE_INSTANCE_NAME = 
GOBBLIN_SERVICE_PREFIX + "instance.name";
 
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY 
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
index 596922c0e..605e961db 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc
@@ -130,6 +130,22 @@
       "doc": "The ID of the spec executor that ran or would have ran the job",
       "compliance": "NONE"
     },
+    {
+      "name": "gaasId",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "The instance of GaaS that is sending the event (if multiple GaaS 
instances are running)"
+    },
+    {
+      "name":"jobProperties",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "The job properties GaaS sends to the job executor. This is a 
JSON string of the job properties"
+    },
     {
       "name": "issues",
       "type": [
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6251ac1d2..f47cbde50 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1029,7 +1029,8 @@ public class DagManager extends AbstractIdleService {
         addSpecFuture.get();
 
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
-
+        // Add serialized job properties as part of the orchestrated job event 
metadata
+        jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
dagNode.getValue().toString());
         if (jobOrchestrationTimer != null) {
           jobOrchestrationTimer.stop(jobMetadata);
         }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index beccb1a13..8a6f5e1d4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
 @EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", 
"jobFuture", "flowStartTime"})
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+  public static final String JOB_PROPS_KEY = "job.props";
   private static final int MAX_JOB_NAME_LENGTH = 255;
 
   private final JobSpec jobSpec;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 2b55e27f0..2b81378f6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -46,7 +46,9 @@ import 
org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
 
 /**
@@ -134,7 +136,9 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
         .setIssues(issueList)
         .setJobStatus(status)
         
.setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
-        .setDatasetsWritten(datasetMetrics);
+        .setDatasetsWritten(datasetMetrics)
+        
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
null))
+        .setJobProperties(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY, 
null));
     return builder.build();
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index f60561992..96383e6e4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -44,7 +44,9 @@ import 
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
 
 public class GaaSObservabilityProducerTest {
@@ -67,7 +69,9 @@ public class GaaSObservabilityProducerTest {
     summaries.add(dataset1);
     summaries.add(dataset2);
 
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+    State state = new State();
+    state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
"testCluster");
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(state, this.issueRepository);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
@@ -84,6 +88,7 @@ public class GaaSObservabilityProducerTest {
     gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
     
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 "20");
     gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
+    gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
     Properties jobStatusProps = new Properties();
     jobStatusProps.putAll(gteEventMetadata);
     producer.emitObservabilityEvent(new State(jobStatusProps));
@@ -116,7 +121,8 @@ public class GaaSObservabilityProducerTest {
     
Assert.assertEquals(event.getDatasetsWritten().get(1).getEntitiesWritten(), 
Long.valueOf(dataset2.getRecordsWritten()));
     Assert.assertEquals(event.getDatasetsWritten().get(1).getBytesWritten(), 
Long.valueOf(dataset2.getBytesWritten()));
     
Assert.assertEquals(event.getDatasetsWritten().get(1).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
-
+    Assert.assertEquals(event.getJobProperties(), 
"{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
+    Assert.assertEquals(event.getGaasId(), "testCluster");
     AvroSerializer<GaaSObservabilityEventExperimental> serializer = new 
AvroBinarySerializer<>(
         GaaSObservabilityEventExperimental.SCHEMA$, new 
NoopSchemaVersionWriter()
     );

Reply via email to