This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new df9a0c1e5 [GOBBLIN-2053] Add header and fix prefix configs
opentelemetry (#3933)
df9a0c1e5 is described below
commit df9a0c1e54d957d43509790cc0adc08cf01e6558
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 25 11:31:15 2024 -0400
[GOBBLIN-2053] Add header and fix prefix configs opentelemetry (#3933)
Add header and fix prefix configs opentelemetry
---
.../gobblin/configuration/ConfigurationKeys.java | 16 +++---
.../data/management/dataset/DatasetUtils.java | 2 +-
.../gobblin/metrics/OpenTelemetryMetrics.java | 34 +++++++++++-
.../gobblin/metrics/OpenTelemetryMetricsTest.java | 64 ++++++++++++++++++++++
.../monitoring/GaaSObservabilityEventProducer.java | 10 ++--
.../monitoring/GaaSObservabilityProducerTest.java | 2 +-
.../org/apache/gobblin/util/PropertiesUtils.java | 2 +-
.../apache/gobblin/util/PropertiesUtilsTest.java | 6 +-
8 files changed, 114 insertions(+), 22 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6f4b100b1..93acece84 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -905,18 +905,18 @@ public class ConfigurationKeys {
public static final String METRICS_CUSTOM_BUILDERS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";
// Opentelemetry based metrics reporting
- public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
- METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX =
"metrics.reporting.opentelemetry.";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
- public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
- METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED
= false;
- public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
- METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
- public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
- METRICS_CONFIGURATIONS_PREFIX +
"reporting.opentelemetry.interval.millis";
+ // Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON
String with string keys and values
+ public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
+
+ public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
/**
* Rest server configuration properties.
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
index 37e789980..4f1038f00 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
@@ -123,7 +123,7 @@ public class DatasetUtils {
try {
Class<?> pathFilterClass =
Class.forName(props.getProperty(PATH_FILTER_KEY));
return (PathFilter)
GobblinConstructorUtils.invokeLongestConstructor(pathFilterClass,
-
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(props,
CONFIGURATION_KEY_PREFIX));
+ PropertiesUtils.extractChildProperties(props,
CONFIGURATION_KEY_PREFIX));
} catch (ReflectiveOperationException exception) {
throw new RuntimeException(exception);
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
index 2809e0e64..1693f8f42 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -18,9 +18,12 @@
package org.apache.gobblin.metrics;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
-import com.google.common.base.Optional;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
@@ -32,6 +35,7 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -40,20 +44,32 @@ import org.apache.gobblin.util.PropertiesUtils;
* A metrics reporter wrapper that uses the OpenTelemetry standard to emit
metrics
* Currently separated from the legacy codehale metrics as we need to maintain
backwards compatibility, but eventually
* can replace the old metrics system with tighter integrations once it's
stable
+ * Defaults to using the HTTP exporter where it expects an endpoint and
optional headers in JSON string format
*/
+@Slf4j
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
private static OpenTelemetryMetrics GLOBAL_INSTANCE;
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS =
10000L;
+
private OpenTelemetryMetrics(State state) {
super(state);
}
@Override
protected MetricExporter initializeMetricExporter(State state) {
+
Preconditions.checkArgument(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT),
+ "OpenTelemetry endpoint must be provided");
OtlpHttpMetricExporterBuilder httpExporterBuilder =
OtlpHttpMetricExporter.builder();
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));
+
+ if
(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS)) {
+ Map<String, String> headers =
parseHttpHeaders(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS));
+ for (Map.Entry<String, String> header : headers.entrySet()) {
+ httpExporterBuilder.addHeader(header.getKey(), header.getValue());
+ }
+ }
return httpExporterBuilder.build();
}
@@ -67,8 +83,9 @@ public class OpenTelemetryMetrics extends
OpenTelemetryMetricsBase {
@Override
protected void initialize(State state) {
- Properties metricProps =
PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
- ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
+ log.info("Initializing OpenTelemetry metrics");
+ Properties metricProps =
PropertiesUtils.extractChildProperties(state.getProperties(),
+ ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX);
AttributesBuilder attributesBuilder = Attributes.builder();
for (String key : metricProps.stringPropertyNames()) {
attributesBuilder.put(AttributeKey.stringKey(key),
metricProps.getProperty(key));
@@ -87,4 +104,15 @@ public class OpenTelemetryMetrics extends
OpenTelemetryMetricsBase {
this.openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
}
+
+ protected static Map<String, String> parseHttpHeaders(String headersString) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(headersString, HashMap.class);
+ } catch (Exception e) {
+ String errMsg = "Failed to parse headers: " + headersString;
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg);
+ }
+ }
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
new file mode 100644
index 000000000..cc3da325f
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/OpenTelemetryMetricsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class OpenTelemetryMetricsTest {
+
+ @Test
+ public void testInitializeOpenTelemetryFailsWithoutEndpoint() {
+ State opentelemetryState = new State();
+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ OpenTelemetryMetrics.getInstance(opentelemetryState);
+ });
+ }
+
+ @Test
+ public void testInitializeOpenTelemetrySucceedsWithEndpoint() {
+ State opentelemetryState = new State();
+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
"http://localhost:4317");
+ // Should not throw an exception
+ OpenTelemetryMetrics.getInstance(opentelemetryState);
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testHeadersParseCorrectly() {
+ Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders(
+
"{\"Content-Type\":\"application/x-protobuf\",\"headerTag\":\"tag1:value1,tag2:value2\"}");
+ Assert.assertEquals(headers.size(), 2);
+ Assert.assertEquals(headers.get("Content-Type"), "application/x-protobuf");
+ Assert.assertEquals(headers.get("headerTag"), "tag1:value1,tag2:value2");
+ }
+
+ @Test
+ void testHeadersParseNull() {
+ Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders("{}");
+ Assert.assertEquals(headers.size(), 0);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index d8f5e4371..e5be9f51a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -65,9 +65,8 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
- public static final String GAAS_OBSERVABILITY_METRICS_PREFIX =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
- public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME =
"gaas.observability.jobStatus";
- public static final String GAAS_OBSERVABILITY_GROUP_NAME =
GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
+ public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
+ public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME =
"jobStatus";
protected MetricContext metricContext;
protected State state;
@@ -99,17 +98,18 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
private void setupMetrics(State state) {
this.opentelemetryMetrics = getOpentelemetryMetrics(state);
if (this.opentelemetryMetrics != null) {
- this.jobStatusMetric =
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+ this.jobStatusMetric =
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
.ofLongs()
.buildObserver();
-
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+ this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.batchCallback(() -> {
for (GaaSObservabilityEventExperimental event :
this.eventCollector) {
Attributes tags = getEventAttributes(event);
int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
this.jobStatusMetric.record(status, tags);
}
+ log.info("Submitted {} job status events",
this.eventCollector.size());
// Empty the list of events as they are all emitted at this point.
this.eventCollector.clear();
}, this.jobStatusMetric);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index eb29d3f94..09bd573da 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -262,7 +262,7 @@ public class GaaSObservabilityProducerTest {
// Check number of meters
Assert.assertEquals(metrics.size(), 1);
Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(),
metricData -> metricData));
- MetricData jobStatusMetric =
metricsByName.get("gaas.observability.jobStatus");
+ MetricData jobStatusMetric = metricsByName.get("jobStatus");
// Check the attributes of the metrics
List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
Assert.assertEquals(datapoints.size(), 2);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index dd103ab12..ab4ad5c9d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -157,7 +157,7 @@ public class PropertiesUtils {
* @param prefix of keys to be extracted
* @return a {@link Properties} instance
*/
- public static Properties
extractPropertiesWithPrefixAfterRemovingPrefix(Properties properties, String
prefix) {
+ public static Properties extractChildProperties(Properties properties,
String prefix) {
Preconditions.checkNotNull(properties);
Preconditions.checkNotNull(prefix);
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index f4470a86e..42291f4ca 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -63,19 +63,19 @@ public class PropertiesUtilsTest {
properties.setProperty("k2.kk", "v3");
// First prefix
- Properties extractedPropertiesK1 =
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties,
"k1.");
+ Properties extractedPropertiesK1 =
PropertiesUtils.extractChildProperties(properties, "k1.");
Assert.assertEquals(extractedPropertiesK1.getProperty("kk1"), "v1");
Assert.assertEquals(extractedPropertiesK1.getProperty("kk2"), "v2");
Assert.assertTrue(!extractedPropertiesK1.containsKey("k2.kk"));
// Second prefix
- Properties extractedPropertiesK2 =
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties,
"k2");
+ Properties extractedPropertiesK2 =
PropertiesUtils.extractChildProperties(properties, "k2");
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk2"));
Assert.assertEquals(extractedPropertiesK2.getProperty(".kk"), "v3");
// Missing prefix
- Properties extractedPropertiesK3 =
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties,
"k3");
+ Properties extractedPropertiesK3 =
PropertiesUtils.extractChildProperties(properties, "k3");
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));