This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 354c78def [GOBBLIN-2060] Change jobStatus opentelemetry metric to
flowSucceeded to better reflect metric values (#3941)
354c78def is described below
commit 354c78def0ae8d276393e373e039cb72f2827a48
Author: William Lo <[email protected]>
AuthorDate: Thu May 2 17:14:45 2024 -0400
[GOBBLIN-2060] Change jobStatus opentelemetry metric to flowSucceeded to
better reflect metric values (#3941)
* Cleanup logs to be debug, jobStatus succeeded should be marked as 1
instead of 0
* Change name of jobstatus metric to jobSucceeded to properly reflect metric
---
.../service/monitoring/GaaSObservabilityEventProducer.java | 12 +++++++-----
.../service/monitoring/GaaSObservabilityProducerTest.java | 13 ++++++++++++-
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index e5be9f51a..0966e458d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -66,7 +66,7 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
- public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME =
"jobStatus";
+ public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME =
"jobSucceeded";
protected MetricContext metricContext;
protected State state;
@@ -99,17 +99,17 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
this.opentelemetryMetrics = getOpentelemetryMetrics(state);
if (this.opentelemetryMetrics != null) {
this.jobStatusMetric =
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
- .gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
+ .gaugeBuilder(GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME)
.ofLongs()
.buildObserver();
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.batchCallback(() -> {
for (GaaSObservabilityEventExperimental event :
this.eventCollector) {
Attributes tags = getEventAttributes(event);
- int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
+ int status = event.getJobStatus() == JobStatus.SUCCEEDED ? 1 : 0;
this.jobStatusMetric.record(status, tags);
}
- log.info("Submitted {} job status events",
this.eventCollector.size());
+ log.debug("Submitted {} job status events",
this.eventCollector.size());
// Empty the list of events as they are all emitted at this point.
this.eventCollector.clear();
}, this.jobStatusMetric);
@@ -127,7 +127,9 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
event.getFlowGroup())
.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
- .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId()).build();
+ .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId())
+ .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId())
+ .build();
return tags;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index 09bd573da..d2fdbd7b7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -262,7 +262,7 @@ public class GaaSObservabilityProducerTest {
// Check number of meters
Assert.assertEquals(metrics.size(), 1);
Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(),
metricData -> metricData));
- MetricData jobStatusMetric = metricsByName.get("jobStatus");
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
// Check the attributes of the metrics
List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
Assert.assertEquals(datapoints.size(), 2);
@@ -274,6 +274,17 @@ public class GaaSObservabilityProducerTest {
// Check common string tag
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+ datapoints.forEach(point -> {
+ if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
{
+ Assert.assertEquals(point.getValue(), 0); // Cancelled job should show
up as a 0
+ } else if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
{
+ Assert.assertEquals(point.getValue(), 1L); // Completed job should
show up as a 1
+ }
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
flowName);
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
jobName);
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ });
}
private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {