phet commented on code in PR #3940:
URL: https://github.com/apache/gobblin/pull/3940#discussion_r1589828726


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -137,26 +138,29 @@ public Attributes 
getEventAttributes(GaaSObservabilityEventExperimental event) {
    * Emits the GaaSObservabilityEvent with the mechanism that the child class 
is built upon e.g. Kafka
    * @param event
    */
-  abstract protected void 
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEvent event);
 
   /**
    * Creates a GaaSObservabilityEvent which is derived from a final GaaS job 
pipeline state, which is combination of GTE job states in an ordered fashion
    * @param jobState
    * @return GaaSObservabilityEvent
    */
-  private GaaSObservabilityEventExperimental 
createGaaSObservabilityEvent(final State jobState) {
+  private GaaSObservabilityEvent createGaaSObservabilityEvent(final State 
jobState) {
     Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
     Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
     Long jobOrchestratedTime = 
jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ? 
jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
     Long jobPlanningPhaseStartTime = 
jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ? 
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
     Long jobPlanningPhaseEndTime = 
jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ? 
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+    String edgeName = 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "");
+    String[] edgeNameParts = 
edgeName.split(BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR);

Review Comment:
   unfortunately this format
   ```
   <src_node>_<dest_node>_<edge_id>
   ```
   is not always correctly parsable, due to some nodes containing `_`.
   
   is there any way to obtain the constituent parts which haven't already been 
joined (into this not-necessarily-reversable) form?



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEvent.avsc:
##########
@@ -0,0 +1,271 @@
+{
+  "type": "record",
+  "name": "GaaSObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a job is 
executed.",
+  "fields": [
+    {
+      "name": "eventTimestamp",
+      "type": "long",
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
+    },
+    {
+      "name": "flowGroup",
+      "type": "string",
+      "doc": "Flow group for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowName",
+      "type": "string",
+      "doc": "Flow name for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowExecutionId",
+      "type": "long",
+      "doc": "Flow execution id for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "lastFlowModificationTimestamp",
+      "type": "long",
+      "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
+    },
+    {
+      "name": "sourceNode",
+      "type": "string",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowEdgeId",
+      "type": "string",
+      "doc": "Flow edge id, excluding the sourceNode and destinationNode",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobName",
+      "type": "string",
+      "doc": "The name of the Gobblin job, found in the job template. One edge 
can contain multiple jobs",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobStatus",
+      "type": {
+        "type": "enum",
+        "name": "JobStatus",
+        "symbols": [
+          "SUCCEEDED",
+          "COMPILATION_FAILURE",
+          "SUBMISSION_FAILURE",
+          "EXECUTION_FAILURE",
+          "CANCELLED"
+        ],
+        "doc": "Final job status for this job in the GaaS flow",
+        "compliance": "NONE"
+      }
+    },
+    {
+      "name": "jobOrchestratedTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Timestamp when the job was successfully sent to the job 
executor, null if it was unable to be sent."
+    },
+    {
+      "name": "jobStartTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the job in millis since Epoch, null if the job was 
never run",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobEndTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Finish time of the job in millis since Epoch, null if the job 
was never run",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobPlanningStartTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the workunit planning phase in millis since Epoch, 
null if the job was never run or fails to reach this phase",
+      "compliance": "NONE",
+      "default": null
+    },
+    {
+      "name": "jobPlanningEndTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "End time of the workunit planning phase in millis since Epoch, 
null if the job was never run or fails to reach this phase",
+      "compliance": "NONE",
+      "default": null
+    },
+    {
+      "name": "effectiveUserUrn",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job",
+      "compliance": "NONE"
+    },
+    {
+      "name": "executorUrl",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "Link to where the job ran, currently limited to Azkaban, if it 
was executed",
+      "compliance": "NONE"
+    },
+    {
+      "name": "executorId",
+      "type": "string",
+      "doc": "The ID of the spec executor that ran or would have ran the job",

Review Comment:
   "...that ran or would have *run* the job" (past participle)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -117,44 +118,47 @@ private void setupMetrics(State state) {
   }
 
   public void emitObservabilityEvent(final State jobState) {
-    GaaSObservabilityEventExperimental event = 
createGaaSObservabilityEvent(jobState);
+    GaaSObservabilityEvent event = createGaaSObservabilityEvent(jobState);
     sendUnderlyingEvent(event);
     this.eventCollector.add(event);
   }
 
-  public Attributes getEventAttributes(GaaSObservabilityEventExperimental 
event) {
+  public Attributes getEventAttributes(GaaSObservabilityEvent event) {
     Attributes tags = 
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
event.getFlowName())
         .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
event.getFlowGroup())
         .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
         .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
-        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowGraphEdgeId()).build();
+        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowEdgeId()).build();

Review Comment:
   does this change semantics, as the nature of the value may differ?



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEvent.avsc:
##########
@@ -0,0 +1,271 @@
+{
+  "type": "record",
+  "name": "GaaSObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a job is 
executed.",
+  "fields": [
+    {
+      "name": "eventTimestamp",
+      "type": "long",
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
+    },
+    {
+      "name": "flowGroup",
+      "type": "string",
+      "doc": "Flow group for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowName",
+      "type": "string",
+      "doc": "Flow name for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowExecutionId",
+      "type": "long",
+      "doc": "Flow execution id for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "lastFlowModificationTimestamp",
+      "type": "long",
+      "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
+    },
+    {
+      "name": "sourceNode",
+      "type": "string",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowEdgeId",
+      "type": "string",
+      "doc": "Flow edge id, excluding the sourceNode and destinationNode",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobName",
+      "type": "string",
+      "doc": "The name of the Gobblin job, found in the job template. One edge 
can contain multiple jobs",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobStatus",
+      "type": {
+        "type": "enum",
+        "name": "JobStatus",
+        "symbols": [
+          "SUCCEEDED",
+          "COMPILATION_FAILURE",
+          "SUBMISSION_FAILURE",
+          "EXECUTION_FAILURE",
+          "CANCELLED"
+        ],
+        "doc": "Final job status for this job in the GaaS flow",
+        "compliance": "NONE"
+      }
+    },
+    {
+      "name": "jobOrchestratedTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Timestamp when the job was successfully sent to the job 
executor, null if it was unable to be sent."
+    },
+    {
+      "name": "jobStartTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the job in millis since Epoch, null if the job was 
never run",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobEndTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Finish time of the job in millis since Epoch, null if the job 
was never run",
+      "compliance": "NONE"
+    },
+    {
+      "name": "jobPlanningStartTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the workunit planning phase in millis since Epoch, 
null if the job was never run or fails to reach this phase",
+      "compliance": "NONE",
+      "default": null
+    },
+    {
+      "name": "jobPlanningEndTimestamp",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "End time of the workunit planning phase in millis since Epoch, 
null if the job was never run or fails to reach this phase",
+      "compliance": "NONE",
+      "default": null
+    },
+    {
+      "name": "effectiveUserUrn",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job",

Review Comment:
   should we define or at least give an example of format?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to