phet commented on code in PR #3940:
URL: https://github.com/apache/gobblin/pull/3940#discussion_r1589828726
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -137,26 +138,29 @@ public Attributes
getEventAttributes(GaaSObservabilityEventExperimental event) {
* Emits the GaaSObservabilityEvent with the mechanism that the child class
is built upon e.g. Kafka
* @param event
*/
- abstract protected void
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+ abstract protected void sendUnderlyingEvent(GaaSObservabilityEvent event);
/**
* Creates a GaaSObservabilityEvent which is derived from a final GaaS job
pipeline state, which is combination of GTE job states in an ordered fashion
* @param jobState
* @return GaaSObservabilityEvent
*/
- private GaaSObservabilityEventExperimental
createGaaSObservabilityEvent(final State jobState) {
+ private GaaSObservabilityEvent createGaaSObservabilityEvent(final State
jobState) {
Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
Long jobOrchestratedTime =
jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
Long jobPlanningPhaseStartTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
Long jobPlanningPhaseEndTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+ String edgeName =
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "");
+ String[] edgeNameParts =
edgeName.split(BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR);
Review Comment:
unfortunately this format
```
<src_node>_<dest_node>_<edge_id>
```
is not always correctly parsable, due to some nodes containing `_`.
is there any way to obtain the constituent parts which haven't already been
joined (into this not-necessarily-reversable) form?
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEvent.avsc:
##########
@@ -0,0 +1,271 @@
+{
+ "type": "record",
+ "name": "GaaSObservabilityEvent",
+ "namespace": "org.apache.gobblin.metrics",
+ "doc": "An event schema for GaaS to emit during and after a job is
executed.",
+ "fields": [
+ {
+ "name": "eventTimestamp",
+ "type": "long",
+ "doc": "Time at which event was created in milliseconds from Unix Epoch"
+ },
+ {
+ "name": "flowGroup",
+ "type": "string",
+ "doc": "Flow group for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowName",
+ "type": "string",
+ "doc": "Flow name for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowExecutionId",
+ "type": "long",
+ "doc": "Flow execution id for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "lastFlowModificationTimestamp",
+ "type": "long",
+ "doc": "Timestamp in millis since Epoch when the flow config was last
modified"
+ },
+ {
+ "name": "sourceNode",
+ "type": "string",
+ "doc": "Source node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "destinationNode",
+ "type": "string",
+ "doc": "Destination node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowEdgeId",
+ "type": "string",
+ "doc": "Flow edge id, excluding the sourceNode and destinationNode",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobName",
+ "type": "string",
+ "doc": "The name of the Gobblin job, found in the job template. One edge
can contain multiple jobs",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobStatus",
+ "type": {
+ "type": "enum",
+ "name": "JobStatus",
+ "symbols": [
+ "SUCCEEDED",
+ "COMPILATION_FAILURE",
+ "SUBMISSION_FAILURE",
+ "EXECUTION_FAILURE",
+ "CANCELLED"
+ ],
+ "doc": "Final job status for this job in the GaaS flow",
+ "compliance": "NONE"
+ }
+ },
+ {
+ "name": "jobOrchestratedTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Timestamp when the job was successfully sent to the job
executor, null if it was unable to be sent."
+ },
+ {
+ "name": "jobStartTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the job in millis since Epoch, null if the job was
never run",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobEndTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Finish time of the job in millis since Epoch, null if the job
was never run",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobPlanningStartTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the workunit planning phase in millis since Epoch,
null if the job was never run or fails to reach this phase",
+ "compliance": "NONE",
+ "default": null
+ },
+ {
+ "name": "jobPlanningEndTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "End time of the workunit planning phase in millis since Epoch,
null if the job was never run or fails to reach this phase",
+ "compliance": "NONE",
+ "default": null
+ },
+ {
+ "name": "effectiveUserUrn",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "User URN (if applicable) whose identity was used to run the
underlying Gobblin job",
+ "compliance": "NONE"
+ },
+ {
+ "name": "executorUrl",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "Link to where the job ran, currently limited to Azkaban, if it
was executed",
+ "compliance": "NONE"
+ },
+ {
+ "name": "executorId",
+ "type": "string",
+ "doc": "The ID of the spec executor that ran or would have ran the job",
Review Comment:
"...that ran or would have *run* the job" (past participle)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -117,44 +118,47 @@ private void setupMetrics(State state) {
}
public void emitObservabilityEvent(final State jobState) {
- GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
+ GaaSObservabilityEvent event = createGaaSObservabilityEvent(jobState);
sendUnderlyingEvent(event);
this.eventCollector.add(event);
}
- public Attributes getEventAttributes(GaaSObservabilityEventExperimental
event) {
+ public Attributes getEventAttributes(GaaSObservabilityEvent event) {
Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
event.getFlowName())
.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
event.getFlowGroup())
.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
- .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId()).build();
+ .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowEdgeId()).build();
Review Comment:
does this change semantics, as the nature of the value may differ?
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEvent.avsc:
##########
@@ -0,0 +1,271 @@
+{
+ "type": "record",
+ "name": "GaaSObservabilityEvent",
+ "namespace": "org.apache.gobblin.metrics",
+ "doc": "An event schema for GaaS to emit during and after a job is
executed.",
+ "fields": [
+ {
+ "name": "eventTimestamp",
+ "type": "long",
+ "doc": "Time at which event was created in milliseconds from Unix Epoch"
+ },
+ {
+ "name": "flowGroup",
+ "type": "string",
+ "doc": "Flow group for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowName",
+ "type": "string",
+ "doc": "Flow name for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowExecutionId",
+ "type": "long",
+ "doc": "Flow execution id for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "lastFlowModificationTimestamp",
+ "type": "long",
+ "doc": "Timestamp in millis since Epoch when the flow config was last
modified"
+ },
+ {
+ "name": "sourceNode",
+ "type": "string",
+ "doc": "Source node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "destinationNode",
+ "type": "string",
+ "doc": "Destination node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowEdgeId",
+ "type": "string",
+ "doc": "Flow edge id, excluding the sourceNode and destinationNode",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobName",
+ "type": "string",
+ "doc": "The name of the Gobblin job, found in the job template. One edge
can contain multiple jobs",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobStatus",
+ "type": {
+ "type": "enum",
+ "name": "JobStatus",
+ "symbols": [
+ "SUCCEEDED",
+ "COMPILATION_FAILURE",
+ "SUBMISSION_FAILURE",
+ "EXECUTION_FAILURE",
+ "CANCELLED"
+ ],
+ "doc": "Final job status for this job in the GaaS flow",
+ "compliance": "NONE"
+ }
+ },
+ {
+ "name": "jobOrchestratedTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Timestamp when the job was successfully sent to the job
executor, null if it was unable to be sent."
+ },
+ {
+ "name": "jobStartTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the job in millis since Epoch, null if the job was
never run",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobEndTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Finish time of the job in millis since Epoch, null if the job
was never run",
+ "compliance": "NONE"
+ },
+ {
+ "name": "jobPlanningStartTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the workunit planning phase in millis since Epoch,
null if the job was never run or fails to reach this phase",
+ "compliance": "NONE",
+ "default": null
+ },
+ {
+ "name": "jobPlanningEndTimestamp",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "End time of the workunit planning phase in millis since Epoch,
null if the job was never run or fails to reach this phase",
+ "compliance": "NONE",
+ "default": null
+ },
+ {
+ "name": "effectiveUserUrn",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "User URN (if applicable) whose identity was used to run the
underlying Gobblin job",
Review Comment:
should we define or at least give an example of format?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]