This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 354c78def [GOBBLIN-2060] Change jobStatus opentelemetry metric to 
flowSucceeded to better reflect metric values (#3941)
354c78def is described below

commit 354c78def0ae8d276393e373e039cb72f2827a48
Author: William Lo <[email protected]>
AuthorDate: Thu May 2 17:14:45 2024 -0400

    [GOBBLIN-2060] Change jobStatus opentelemetry metric to flowSucceeded to 
better reflect metric values (#3941)
    
    * Cleanup logs to be debug, jobStatus succeeded should be marked as 1 
instead of 0
    
    * Change name of jobstatus metric to jobSucceeded to properly reflect metric
---
 .../service/monitoring/GaaSObservabilityEventProducer.java  | 12 +++++++-----
 .../service/monitoring/GaaSObservabilityProducerTest.java   | 13 ++++++++++++-
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index e5be9f51a..0966e458d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -66,7 +66,7 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
   public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
   public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
-  public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = 
"jobStatus";
+  public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME = 
"jobSucceeded";
 
   protected MetricContext metricContext;
   protected State state;
@@ -99,17 +99,17 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
     this.opentelemetryMetrics = getOpentelemetryMetrics(state);
     if (this.opentelemetryMetrics != null) {
       this.jobStatusMetric = 
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
-          .gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
+          .gaugeBuilder(GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME)
           .ofLongs()
           .buildObserver();
       this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
           .batchCallback(() -> {
             for (GaaSObservabilityEventExperimental event : 
this.eventCollector) {
               Attributes tags = getEventAttributes(event);
-              int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
+              int status = event.getJobStatus() == JobStatus.SUCCEEDED ? 1 : 0;
               this.jobStatusMetric.record(status, tags);
             }
-            log.info("Submitted {} job status events", 
this.eventCollector.size());
+            log.debug("Submitted {} job status events", 
this.eventCollector.size());
             // Empty the list of events as they are all emitted at this point.
             this.eventCollector.clear();
           }, this.jobStatusMetric);
@@ -127,7 +127,9 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
         .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
event.getFlowGroup())
         .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
         .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
-        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowGraphEdgeId()).build();
+        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId())
+        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowGraphEdgeId())
+        .build();
     return tags;
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index 09bd573da..d2fdbd7b7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -262,7 +262,7 @@ public class GaaSObservabilityProducerTest {
     // Check number of meters
     Assert.assertEquals(metrics.size(), 1);
     Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), 
metricData -> metricData));
-    MetricData jobStatusMetric = metricsByName.get("jobStatus");
+    MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
     // Check the attributes of the metrics
     List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
     Assert.assertEquals(datapoints.size(), 2);
@@ -274,6 +274,17 @@ public class GaaSObservabilityProducerTest {
     // Check common string tag
     
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
     
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
+    datapoints.forEach(point -> {
+      if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
 {
+        Assert.assertEquals(point.getValue(), 0); // Cancelled job should show 
up as a 0
+      } else if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
 {
+        Assert.assertEquals(point.getValue(), 1L); // Completed job should 
show up as a 1
+      }
+      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
 flowName);
+      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
 jobName);
+      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
 "flowEdge");
+      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
 "specExecutor");
+    });
   }
 
   private Issue createTestIssue(String summary, String code, IssueSeverity 
severity) {

Reply via email to