This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 058d23de7 [GOBBLIN-2018] Add Opentelemetry metrics for GaaS 
observability (#3881)
058d23de7 is described below

commit 058d23de7a3df4caf6750144f418f29366faf56c
Author: William Lo <[email protected]>
AuthorDate: Wed Mar 20 14:56:45 2024 -0400

    [GOBBLIN-2018] Add Opentelemetry metrics for GaaS observability (#3881)
    
    Add opentelemetry multi dimensional metrics to GaaS observability event 
emitter
---
 .../gobblin/configuration/ConfigurationKeys.java   |  14 +++
 gobblin-metrics-libs/gobblin-metrics/build.gradle  |   5 +-
 .../metrics/InMemoryOpenTelemetryMetrics.java      |  59 ++++++++++++
 .../gobblin/metrics/OpenTelemetryMetrics.java      |  90 ++++++++++++++++++
 .../gobblin/metrics/OpenTelemetryMetricsBase.java  |  57 +++++++++++
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |   4 +-
 .../monitoring/GaaSObservabilityEventProducer.java |  47 ++++++++++
 .../monitoring/GaaSObservabilityProducerTest.java  | 104 ++++++++++++++++++++-
 .../MockGaaSObservabilityEventProducer.java        |  14 ++-
 .../org/apache/gobblin/util/PropertiesUtils.java   |   3 +-
 gradle/scripts/defaultBuildProperties.gradle       |   3 +-
 gradle/scripts/dependencyDefinitions.gradle        |   4 +
 12 files changed, 394 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index aa9bbd50e..d7204df8f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -904,6 +904,20 @@ public class ConfigurationKeys {
   //Custom-reporting
   public static final String METRICS_CUSTOM_BUILDERS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";
 
+  // Opentelemetry based metrics reporting
+  public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
+
+  public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
+  public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED 
= false;
+
+  public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
+
+  public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
+      METRICS_CONFIGURATIONS_PREFIX + 
"reporting.opentelemetry.interval.millis";
+
   /**
    * Rest server configuration properties.
    */
diff --git a/gobblin-metrics-libs/gobblin-metrics/build.gradle 
b/gobblin-metrics-libs/gobblin-metrics/build.gradle
index 0b211f8da..be731ff1b 100644
--- a/gobblin-metrics-libs/gobblin-metrics/build.gradle
+++ b/gobblin-metrics-libs/gobblin-metrics/build.gradle
@@ -33,7 +33,10 @@ dependencies {
   compile externalDependency.commonsLang3
   compile externalDependency.typesafeConfig
   compile externalDependency.findBugsAnnotations
-
+  compile externalDependency.opentelemetryApi
+  compile externalDependency.opentelemetrySdk
+  compile externalDependency.opentelemetryExporterOtlp
+  compile externalDependency.opentelemetrySdkTesting
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
new file mode 100644
index 000000000..60f95e571
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A stub for OpenTelemetryMetrics that uses in-memory metric reader for 
testing purposes
+ */
+public class InMemoryOpenTelemetryMetrics extends OpenTelemetryMetricsBase {
+  public InMemoryMetricReader metricReader;
+  public InMemoryOpenTelemetryMetrics(State state) {
+    super(state);
+  }
+
+  @Override
+  void initialize(State state) {
+    this.metricReader = InMemoryMetricReader.create();
+    SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+        .registerMetricReader(this.metricReader)
+        .build();
+    this.openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+  }
+
+  MetricExporter initializeMetricExporter(State state) {
+    // Metrics are never exported if the metric reader is in-memory
+    return null;
+  }
+
+  /**
+   * Returns a new instance of {@link InMemoryOpenTelemetryMetrics} as it does 
not need to be globally shared
+   * @param state
+   * @return
+   */
+  public static InMemoryOpenTelemetryMetrics getInstance(State state) {
+    return new InMemoryOpenTelemetryMetrics(state);
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
new file mode 100644
index 000000000..2809e0e64
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
+import 
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.PropertiesUtils;
+/**
+ * A metrics reporter wrapper that uses the OpenTelemetry standard to emit 
metrics
+ * Currently separated from the legacy codehale metrics as we need to maintain 
backwards compatibility, but eventually
+ * can replace the old metrics system with tighter integrations once it's 
stable
+ */
+
+public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
+
+  private static OpenTelemetryMetrics GLOBAL_INSTANCE;
+  private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 
10000L;
+  private OpenTelemetryMetrics(State state) {
+    super(state);
+  }
+
+  @Override
+  protected MetricExporter initializeMetricExporter(State state) {
+    OtlpHttpMetricExporterBuilder httpExporterBuilder = 
OtlpHttpMetricExporter.builder();
+    
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));
+    return httpExporterBuilder.build();
+  }
+
+  public static OpenTelemetryMetrics getInstance(State state) {
+    if 
(state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) && 
GLOBAL_INSTANCE == null) {
+      GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+    }
+    return GLOBAL_INSTANCE;
+  }
+
+  @Override
+  protected void initialize(State state) {
+    Properties metricProps = 
PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
+        ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
+    AttributesBuilder attributesBuilder = Attributes.builder();
+    for (String key : metricProps.stringPropertyNames()) {
+      attributesBuilder.put(AttributeKey.stringKey(key), 
metricProps.getProperty(key));
+    }
+    Resource metricsResource = 
Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
+
+    SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+        .setResource(metricsResource)
+        .registerMetricReader(
+            PeriodicMetricReader.builder(this.metricExporter)
+                .setInterval(Duration.ofMillis(
+                    
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
+                        DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
+                .build())
+        .build();
+
+    this.openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
new file mode 100644
index 000000000..42642af4c
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.io.IOException;
+
+import com.google.common.io.Closer;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+
+import org.apache.gobblin.configuration.State;
+
+
+public abstract class OpenTelemetryMetricsBase implements AutoCloseable {
+  protected MetricExporter metricExporter;
+
+  protected OpenTelemetry openTelemetry;
+
+  Closer closer;
+
+  public OpenTelemetryMetricsBase(State state) {
+    this.closer = Closer.create();
+    this.metricExporter = initializeMetricExporter(state);
+    this.closer.register(this.metricExporter);
+    initialize(state);
+  }
+
+  abstract MetricExporter initializeMetricExporter(State state);
+  abstract void initialize(State state);
+
+  public Meter getMeter(String groupName) {
+    return this.openTelemetry.getMeterProvider().get(groupName);
+  }
+
+  public void close() throws IOException {
+    if (this.closer != null) {
+      this.closer.close();
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index e63969b86..d9102a1be 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -535,7 +535,7 @@ public class KafkaAvroJobStatusMonitorTest {
     }
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
     MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
-        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, 
false);
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
         mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
@@ -582,7 +582,7 @@ public class KafkaAvroJobStatusMonitorTest {
     }
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
     MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
-        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, 
false);
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
         mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 2b81378f6..d8f5e4371 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -27,6 +27,8 @@ import java.util.stream.Collectors;
 import com.codahale.metrics.MetricRegistry;
 import com.google.gson.reflect.TypeToken;
 
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
@@ -38,6 +40,8 @@ import org.apache.gobblin.metrics.Issue;
 import org.apache.gobblin.metrics.IssueSeverity;
 import org.apache.gobblin.metrics.JobStatus;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.DatasetTaskSummary;
@@ -61,9 +65,16 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
   public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+  public static final String GAAS_OBSERVABILITY_METRICS_PREFIX = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
+  public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = 
"gaas.observability.jobStatus";
+  public static final String GAAS_OBSERVABILITY_GROUP_NAME = 
GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
 
   protected MetricContext metricContext;
   protected State state;
+
+  List<GaaSObservabilityEventExperimental> eventCollector = new ArrayList<>();
+  protected OpenTelemetryMetricsBase opentelemetryMetrics;
+  protected ObservableLongMeasurement jobStatusMetric;
   protected MultiContextIssueRepository issueRepository;
   protected boolean instrumentationEnabled;
   ContextAwareMeter getIssuesFailedMeter;
@@ -76,12 +87,48 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
       this.metricContext = Instrumented.getMetricContext(state, getClass());
       this.getIssuesFailedMeter = 
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ISSUES_READ_FAILED_METRIC_NAME));
+      setupMetrics(state);
+    }
+  }
+
+  protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
+    return OpenTelemetryMetrics.getInstance(state);
+  }
+
+
+  private void setupMetrics(State state) {
+    this.opentelemetryMetrics = getOpentelemetryMetrics(state);
+    if (this.opentelemetryMetrics != null) {
+      this.jobStatusMetric = 
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+          .gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
+          .ofLongs()
+          .buildObserver();
+      
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+          .batchCallback(() -> {
+            for (GaaSObservabilityEventExperimental event : 
this.eventCollector) {
+              Attributes tags = getEventAttributes(event);
+              int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
+              this.jobStatusMetric.record(status, tags);
+            }
+            // Empty the list of events as they are all emitted at this point.
+            this.eventCollector.clear();
+          }, this.jobStatusMetric);
     }
   }
 
   public void emitObservabilityEvent(final State jobState) {
     GaaSObservabilityEventExperimental event = 
createGaaSObservabilityEvent(jobState);
     sendUnderlyingEvent(event);
+    this.eventCollector.add(event);
+  }
+
+  public Attributes getEventAttributes(GaaSObservabilityEventExperimental 
event) {
+    Attributes tags = 
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
event.getFlowName())
+        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
event.getFlowGroup())
+        .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
+        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
+        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
event.getFlowGraphEdgeId()).build();
+    return tags;
   }
 
   /**
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index 96383e6e4..eb29d3f94 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -19,16 +19,23 @@ package org.apache.gobblin.service.monitoring;
 
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
-import org.testng.annotations.Test;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
 
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
 import org.apache.gobblin.metrics.JobStatus;
@@ -71,7 +78,7 @@ public class GaaSObservabilityProducerTest {
 
     State state = new State();
     state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
"testCluster");
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(state, this.issueRepository);
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(state, this.issueRepository, false);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
@@ -139,7 +146,7 @@ public class GaaSObservabilityProducerTest {
         TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
         createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
-    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(new State(), this.issueRepository, false);
     Map<String, String> gteEventMetadata = Maps.newHashMap();
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
@@ -178,6 +185,97 @@ public class GaaSObservabilityProducerTest {
     serializer.serializeRecord(event);
   }
 
+  @Test
+  public void testEnableMetrics() throws Exception {
+    String flowGroup = "testFlowGroup2";
+    String flowName = "testFlowName2";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
 "http://localhost:5000";);
+
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+
+    // Ensure that this doesn't throw due to NPE
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+  }
+
+  @Test
+  public void testMockProduceMetrics() throws Exception {
+    String flowGroup = "testFlowGroup2";
+    String flowName = "testFlowName2";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+
+    MockGaaSObservabilityEventProducer producer = new 
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
+
+    Map<String, String> gteEventMetadata2 = Maps.newHashMap();
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"2");
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+    gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+    gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    Properties jobStatusProps2 = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);    // Ensure that this doesn't 
throw due to NPE
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+    jobStatusProps2.putAll(gteEventMetadata2);
+    producer.emitObservabilityEvent(new State(jobStatusProps2));
+    Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+    // Check number of meters
+    Assert.assertEquals(metrics.size(), 1);
+    Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), 
metricData -> metricData));
+    MetricData jobStatusMetric = 
metricsByName.get("gaas.observability.jobStatus");
+    // Check the attributes of the metrics
+    List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+    Assert.assertEquals(datapoints.size(), 2);
+    // Check that the values are different for the two events (order not 
guaranteed for the same collection event)
+    Assert.assertNotEquals(datapoints.get(0).getValue(), 
datapoints.get(1).getValue());
+    
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
+        
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
+
+    // Check common string tag
+    
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
+    
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
+  }
+
   private Issue createTestIssue(String summary, String code, IssueSeverity 
severity) {
     return 
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
index 770276677..eb9269535 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
@@ -23,6 +23,8 @@ import java.util.List;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
 
@@ -33,10 +35,14 @@ import 
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {
   private List<GaaSObservabilityEventExperimental> emittedEvents = new 
ArrayList<>();
 
-  public MockGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository) {
-    super(state, issueRepository, false);
+  public MockGaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    super(state, issueRepository, instrumentationEnabled);
   }
 
+  @Override
+  protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
+    return InMemoryOpenTelemetryMetrics.getInstance(state);
+  }
   @Override
   protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) 
{
     emittedEvents.add(event);
@@ -50,4 +56,8 @@ public class MockGaaSObservabilityEventProducer extends 
GaaSObservabilityEventPr
   public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
     return Collections.unmodifiableList(this.emittedEvents);
   }
+
+  public InMemoryOpenTelemetryMetrics getOpentelemetryMetrics() {
+    return (InMemoryOpenTelemetryMetrics) this.opentelemetryMetrics;
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 0aaed3729..dd103ab12 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -83,7 +83,8 @@ public class PropertiesUtils {
   /** @throws {@link NullPointerException} when `key` not in `properties` */
   public static String getRequiredPropRaw(Properties properties, String key, 
Optional<String> desc) {
     String value = properties.getProperty(key);
-    Preconditions.checkNotNull(value, "'" + key + "' must be set" + 
desc.transform(s -> " (to " + desc + ")").or(""));
+    Preconditions.checkNotNull(value, "'" + key + "' must be set" + 
desc.transform
+        (s -> " (to " + desc + ")").or(""));
     return value;
   }
 
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index a2668660c..c3ff18b54 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,7 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("publishToMaven", false, "Enable publishing of 
artifacts to a central Maven repository"))
     .register(new BuildProperty("publishToNexus", false, "Enable publishing of 
artifacts to Nexus"))
     .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce 
dependencies version"))
-
+    .register(new BuildProperty("openTelemetryVersion", "1.29.0", 
"OpenTelemetry dependencies version"))
 task buildProperties(description: 'Lists main properties that can be used to 
customize the build') {
   doLast {
     BUILD_PROPERTIES.printHelp();
@@ -73,5 +73,6 @@ BUILD_PROPERTIES.ensureDefined('kafka09Version')
 BUILD_PROPERTIES.ensureDefined('kafka1Version')
 BUILD_PROPERTIES.ensureDefined('pegasusVersion')
 BUILD_PROPERTIES.ensureDefined('salesforceVersion')
+BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
 
 ext.buildProperties = BUILD_PROPERTIES
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 4d962818d..cecba15fd 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -116,6 +116,10 @@ ext.externalDependency = [
     "metricsJvm": "io.dropwizard.metrics:metrics-jvm:" + 
dropwizardMetricsVersion,
     "metricsGraphite": "io.dropwizard.metrics:metrics-graphite:" + 
dropwizardMetricsVersion,
     "metricsJmx": "io.dropwizard.metrics:metrics-jmx:" + 
dropwizardMetricsVersion,
+    "opentelemetryApi": "io.opentelemetry:opentelemetry-api:" + 
openTelemetryVersion,
+    "opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" + 
openTelemetryVersion,
+    "opentelemetryExporterOtlp": 
"io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
+    "opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" + 
openTelemetryVersion,
     "jsch": "com.jcraft:jsch:0.1.54",
     "jdo2": "javax.jdo:jdo2-api:2.1",
     "azkaban": "com.linkedin.azkaban:azkaban:2.5.0",

Reply via email to