This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new df9a0c1e5 [GOBBLIN-2053] Add header and fix prefix configs 
opentelemetry (#3933)
df9a0c1e5 is described below

commit df9a0c1e54d957d43509790cc0adc08cf01e6558
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 25 11:31:15 2024 -0400

    [GOBBLIN-2053] Add header and fix prefix configs opentelemetry (#3933)
    
    Add header and fix prefix configs opentelemetry
---
 .../gobblin/configuration/ConfigurationKeys.java   | 16 +++---
 .../data/management/dataset/DatasetUtils.java      |  2 +-
 .../gobblin/metrics/OpenTelemetryMetrics.java      | 34 +++++++++++-
 .../gobblin/metrics/OpenTelemetryMetricsTest.java  | 64 ++++++++++++++++++++++
 .../monitoring/GaaSObservabilityEventProducer.java | 10 ++--
 .../monitoring/GaaSObservabilityProducerTest.java  |  2 +-
 .../org/apache/gobblin/util/PropertiesUtils.java   |  2 +-
 .../apache/gobblin/util/PropertiesUtilsTest.java   |  6 +-
 8 files changed, 114 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6f4b100b1..93acece84 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -905,18 +905,18 @@ public class ConfigurationKeys {
   public static final String METRICS_CUSTOM_BUILDERS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";
 
   // Opentelemetry based metrics reporting
-  public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
-      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = 
"metrics.reporting.opentelemetry.";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
 
-  public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
-      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
   public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED 
= false;
 
-  public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
-      METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
 
-  public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
-      METRICS_CONFIGURATIONS_PREFIX + 
"reporting.opentelemetry.interval.millis";
+  // Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON 
String with string keys and values
+  public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
+
+  public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = 
METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
 
   /**
    * Rest server configuration properties.
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
index 37e789980..4f1038f00 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
@@ -123,7 +123,7 @@ public class DatasetUtils {
     try {
       Class<?> pathFilterClass = 
Class.forName(props.getProperty(PATH_FILTER_KEY));
       return (PathFilter) 
GobblinConstructorUtils.invokeLongestConstructor(pathFilterClass,
-          
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(props, 
CONFIGURATION_KEY_PREFIX));
+          PropertiesUtils.extractChildProperties(props, 
CONFIGURATION_KEY_PREFIX));
     } catch (ReflectiveOperationException exception) {
       throw new RuntimeException(exception);
     }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
index 2809e0e64..1693f8f42 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -18,9 +18,12 @@
 package org.apache.gobblin.metrics;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.base.Optional;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 
 import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.common.Attributes;
@@ -32,6 +35,7 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
 import io.opentelemetry.sdk.metrics.export.MetricExporter;
 import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
 import io.opentelemetry.sdk.resources.Resource;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -40,20 +44,32 @@ import org.apache.gobblin.util.PropertiesUtils;
  * A metrics reporter wrapper that uses the OpenTelemetry standard to emit 
metrics
  * Currently separated from the legacy codehale metrics as we need to maintain 
backwards compatibility, but eventually
  * can replace the old metrics system with tighter integrations once it's 
stable
+ * Defaults to using the HTTP exporter where it expects an endpoint and 
optional headers in JSON string format
  */
 
+@Slf4j
 public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
 
   private static OpenTelemetryMetrics GLOBAL_INSTANCE;
   private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 
10000L;
+
   private OpenTelemetryMetrics(State state) {
     super(state);
   }
 
   @Override
   protected MetricExporter initializeMetricExporter(State state) {
+    
Preconditions.checkArgument(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT),
+        "OpenTelemetry endpoint must be provided");
     OtlpHttpMetricExporterBuilder httpExporterBuilder = 
OtlpHttpMetricExporter.builder();
     
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));
+
+    if 
(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS)) {
+      Map<String, String> headers = 
parseHttpHeaders(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS));
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        httpExporterBuilder.addHeader(header.getKey(), header.getValue());
+      }
+    }
     return httpExporterBuilder.build();
   }
 
@@ -67,8 +83,9 @@ public class OpenTelemetryMetrics extends 
OpenTelemetryMetricsBase {
 
   @Override
   protected void initialize(State state) {
-    Properties metricProps = 
PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
-        ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
+    log.info("Initializing OpenTelemetry metrics");
+    Properties metricProps = 
PropertiesUtils.extractChildProperties(state.getProperties(),
+        ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX);
     AttributesBuilder attributesBuilder = Attributes.builder();
     for (String key : metricProps.stringPropertyNames()) {
       attributesBuilder.put(AttributeKey.stringKey(key), 
metricProps.getProperty(key));
@@ -87,4 +104,15 @@ public class OpenTelemetryMetrics extends 
OpenTelemetryMetricsBase {
 
     this.openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
   }
+
+  protected static Map<String, String> parseHttpHeaders(String headersString) {
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      return mapper.readValue(headersString, HashMap.class);
+    } catch (Exception e) {
+      String errMsg = "Failed to parse headers: " + headersString;
+      log.error(errMsg, e);
+      throw new RuntimeException(errMsg);
+    }
+  }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
new file mode 100644
index 000000000..cc3da325f
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class OpenTelemetryMetricsTest  {
+
+  @Test
+  public void testInitializeOpenTelemetryFailsWithoutEndpoint() {
+    State opentelemetryState = new State();
+    
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    Assert.assertThrows(IllegalArgumentException.class, () -> {
+      OpenTelemetryMetrics.getInstance(opentelemetryState);
+    });
+  }
+
+  @Test
+  public void testInitializeOpenTelemetrySucceedsWithEndpoint() {
+    State opentelemetryState = new State();
+    
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
 "http://localhost:4317";);
+    // Should not throw an exception
+    OpenTelemetryMetrics.getInstance(opentelemetryState);
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testHeadersParseCorrectly() {
+    Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders(
+        
"{\"Content-Type\":\"application/x-protobuf\",\"headerTag\":\"tag1:value1,tag2:value2\"}");
+    Assert.assertEquals(headers.size(), 2);
+    Assert.assertEquals(headers.get("Content-Type"), "application/x-protobuf");
+    Assert.assertEquals(headers.get("headerTag"), "tag1:value1,tag2:value2");
+  }
+
+  @Test
+  void testHeadersParseNull() {
+    Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders("{}");
+    Assert.assertEquals(headers.size(), 0);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index d8f5e4371..e5be9f51a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -65,9 +65,8 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
   public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
-  public static final String GAAS_OBSERVABILITY_METRICS_PREFIX = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
-  public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = 
"gaas.observability.jobStatus";
-  public static final String GAAS_OBSERVABILITY_GROUP_NAME = 
GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
+  public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
+  public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = 
"jobStatus";
 
   protected MetricContext metricContext;
   protected State state;
@@ -99,17 +98,18 @@ public abstract class GaaSObservabilityEventProducer 
implements Closeable {
   private void setupMetrics(State state) {
     this.opentelemetryMetrics = getOpentelemetryMetrics(state);
     if (this.opentelemetryMetrics != null) {
-      this.jobStatusMetric = 
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+      this.jobStatusMetric = 
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
           .gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
           .ofLongs()
           .buildObserver();
-      
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+      this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
           .batchCallback(() -> {
             for (GaaSObservabilityEventExperimental event : 
this.eventCollector) {
               Attributes tags = getEventAttributes(event);
               int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
               this.jobStatusMetric.record(status, tags);
             }
+            log.info("Submitted {} job status events", 
this.eventCollector.size());
             // Empty the list of events as they are all emitted at this point.
             this.eventCollector.clear();
           }, this.jobStatusMetric);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index eb29d3f94..09bd573da 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -262,7 +262,7 @@ public class GaaSObservabilityProducerTest {
     // Check number of meters
     Assert.assertEquals(metrics.size(), 1);
     Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), 
metricData -> metricData));
-    MetricData jobStatusMetric = 
metricsByName.get("gaas.observability.jobStatus");
+    MetricData jobStatusMetric = metricsByName.get("jobStatus");
     // Check the attributes of the metrics
     List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
     Assert.assertEquals(datapoints.size(), 2);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index dd103ab12..ab4ad5c9d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -157,7 +157,7 @@ public class PropertiesUtils {
    * @param prefix of keys to be extracted
    * @return a {@link Properties} instance
    */
-  public static Properties 
extractPropertiesWithPrefixAfterRemovingPrefix(Properties properties, String 
prefix) {
+  public static Properties extractChildProperties(Properties properties, 
String prefix) {
     Preconditions.checkNotNull(properties);
     Preconditions.checkNotNull(prefix);
 
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index f4470a86e..42291f4ca 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -63,19 +63,19 @@ public class PropertiesUtilsTest {
     properties.setProperty("k2.kk", "v3");
 
     // First prefix
-    Properties extractedPropertiesK1 = 
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, 
"k1.");
+    Properties extractedPropertiesK1 = 
PropertiesUtils.extractChildProperties(properties, "k1.");
     Assert.assertEquals(extractedPropertiesK1.getProperty("kk1"), "v1");
     Assert.assertEquals(extractedPropertiesK1.getProperty("kk2"), "v2");
     Assert.assertTrue(!extractedPropertiesK1.containsKey("k2.kk"));
 
     // Second prefix
-    Properties extractedPropertiesK2 = 
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, 
"k2");
+    Properties extractedPropertiesK2 = 
PropertiesUtils.extractChildProperties(properties, "k2");
     Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk1"));
     Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk2"));
     Assert.assertEquals(extractedPropertiesK2.getProperty(".kk"), "v3");
 
     // Missing prefix
-    Properties extractedPropertiesK3 = 
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, 
"k3");
+    Properties extractedPropertiesK3 = 
PropertiesUtils.extractChildProperties(properties, "k3");
     Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
     Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
     Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));

Reply via email to