This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 058d23de7 [GOBBLIN-2018] Add Opentelemetry metrics for GaaS
observability (#3881)
058d23de7 is described below
commit 058d23de7a3df4caf6750144f418f29366faf56c
Author: William Lo <[email protected]>
AuthorDate: Wed Mar 20 14:56:45 2024 -0400
[GOBBLIN-2018] Add Opentelemetry metrics for GaaS observability (#3881)
Add opentelemetry multi dimensional metrics to GaaS observability event
emitter
---
.../gobblin/configuration/ConfigurationKeys.java | 14 +++
gobblin-metrics-libs/gobblin-metrics/build.gradle | 5 +-
.../metrics/InMemoryOpenTelemetryMetrics.java | 59 ++++++++++++
.../gobblin/metrics/OpenTelemetryMetrics.java | 90 ++++++++++++++++++
.../gobblin/metrics/OpenTelemetryMetricsBase.java | 57 +++++++++++
.../runtime/KafkaAvroJobStatusMonitorTest.java | 4 +-
.../monitoring/GaaSObservabilityEventProducer.java | 47 ++++++++++
.../monitoring/GaaSObservabilityProducerTest.java | 104 ++++++++++++++++++++-
.../MockGaaSObservabilityEventProducer.java | 14 ++-
.../org/apache/gobblin/util/PropertiesUtils.java | 3 +-
gradle/scripts/defaultBuildProperties.gradle | 3 +-
gradle/scripts/dependencyDefinitions.gradle | 4 +
12 files changed, 394 insertions(+), 10 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index aa9bbd50e..d7204df8f 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -904,6 +904,20 @@ public class ConfigurationKeys {
//Custom-reporting
public static final String METRICS_CUSTOM_BUILDERS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";
+ // Opentelemetry based metrics reporting
+ public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
+
+ public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
+ public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED
= false;
+
+ public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
+
+ public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
+ METRICS_CONFIGURATIONS_PREFIX +
"reporting.opentelemetry.interval.millis";
+
/**
* Rest server configuration properties.
*/
diff --git a/gobblin-metrics-libs/gobblin-metrics/build.gradle
b/gobblin-metrics-libs/gobblin-metrics/build.gradle
index 0b211f8da..be731ff1b 100644
--- a/gobblin-metrics-libs/gobblin-metrics/build.gradle
+++ b/gobblin-metrics-libs/gobblin-metrics/build.gradle
@@ -33,7 +33,10 @@ dependencies {
compile externalDependency.commonsLang3
compile externalDependency.typesafeConfig
compile externalDependency.findBugsAnnotations
-
+ compile externalDependency.opentelemetryApi
+ compile externalDependency.opentelemetrySdk
+ compile externalDependency.opentelemetryExporterOtlp
+ compile externalDependency.opentelemetrySdkTesting
testCompile externalDependency.testng
testCompile externalDependency.mockito
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
new file mode 100644
index 000000000..60f95e571
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/InMemoryOpenTelemetryMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A stub for OpenTelemetryMetrics that uses in-memory metric reader for
testing purposes
+ */
+public class InMemoryOpenTelemetryMetrics extends OpenTelemetryMetricsBase {
+ public InMemoryMetricReader metricReader;
+ public InMemoryOpenTelemetryMetrics(State state) {
+ super(state);
+ }
+
+ @Override
+ void initialize(State state) {
+ this.metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(this.metricReader)
+ .build();
+ this.openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+ }
+
+ MetricExporter initializeMetricExporter(State state) {
+ // Metrics are never exported if the metric reader is in-memory
+ return null;
+ }
+
+ /**
+ * Returns a new instance of {@link InMemoryOpenTelemetryMetrics} as it does
not need to be globally shared
+ * @param state
+ * @return
+ */
+ public static InMemoryOpenTelemetryMetrics getInstance(State state) {
+ return new InMemoryOpenTelemetryMetrics(state);
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
new file mode 100644
index 000000000..2809e0e64
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
+import
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.PropertiesUtils;
+/**
+ * A metrics reporter wrapper that uses the OpenTelemetry standard to emit
metrics
+ * Currently separated from the legacy codehale metrics as we need to maintain
backwards compatibility, but eventually
+ * can replace the old metrics system with tighter integrations once it's
stable
+ */
+
+public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
+
+ private static OpenTelemetryMetrics GLOBAL_INSTANCE;
+ private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS =
10000L;
+ private OpenTelemetryMetrics(State state) {
+ super(state);
+ }
+
+ @Override
+ protected MetricExporter initializeMetricExporter(State state) {
+ OtlpHttpMetricExporterBuilder httpExporterBuilder =
OtlpHttpMetricExporter.builder();
+
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));
+ return httpExporterBuilder.build();
+ }
+
+ public static OpenTelemetryMetrics getInstance(State state) {
+ if
(state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) &&
GLOBAL_INSTANCE == null) {
+ GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+ }
+ return GLOBAL_INSTANCE;
+ }
+
+ @Override
+ protected void initialize(State state) {
+ Properties metricProps =
PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
+ ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ for (String key : metricProps.stringPropertyNames()) {
+ attributesBuilder.put(AttributeKey.stringKey(key),
metricProps.getProperty(key));
+ }
+ Resource metricsResource =
Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
+
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .setResource(metricsResource)
+ .registerMetricReader(
+ PeriodicMetricReader.builder(this.metricExporter)
+ .setInterval(Duration.ofMillis(
+
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
+ DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
+ .build())
+ .build();
+
+ this.openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
new file mode 100644
index 000000000..42642af4c
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetricsBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.io.IOException;
+
+import com.google.common.io.Closer;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+
+import org.apache.gobblin.configuration.State;
+
+
+public abstract class OpenTelemetryMetricsBase implements AutoCloseable {
+ protected MetricExporter metricExporter;
+
+ protected OpenTelemetry openTelemetry;
+
+ Closer closer;
+
+ public OpenTelemetryMetricsBase(State state) {
+ this.closer = Closer.create();
+ this.metricExporter = initializeMetricExporter(state);
+ this.closer.register(this.metricExporter);
+ initialize(state);
+ }
+
+ abstract MetricExporter initializeMetricExporter(State state);
+ abstract void initialize(State state);
+
+ public Meter getMeter(String groupName) {
+ return this.openTelemetry.getMeterProvider().get(groupName);
+ }
+
+ public void close() throws IOException {
+ if (this.closer != null) {
+ this.closer.close();
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index e63969b86..d9102a1be 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -535,7 +535,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
- ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository,
false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
@@ -582,7 +582,7 @@ public class KafkaAvroJobStatusMonitorTest {
}
MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
- ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository,
false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
index 2b81378f6..d8f5e4371 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java
@@ -27,6 +27,8 @@ import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.google.gson.reflect.TypeToken;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
@@ -38,6 +40,8 @@ import org.apache.gobblin.metrics.Issue;
import org.apache.gobblin.metrics.IssueSeverity;
import org.apache.gobblin.metrics.JobStatus;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
@@ -61,9 +65,16 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+ public static final String GAAS_OBSERVABILITY_METRICS_PREFIX =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
+ public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME =
"gaas.observability.jobStatus";
+ public static final String GAAS_OBSERVABILITY_GROUP_NAME =
GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
protected MetricContext metricContext;
protected State state;
+
+ List<GaaSObservabilityEventExperimental> eventCollector = new ArrayList<>();
+ protected OpenTelemetryMetricsBase opentelemetryMetrics;
+ protected ObservableLongMeasurement jobStatusMetric;
protected MultiContextIssueRepository issueRepository;
protected boolean instrumentationEnabled;
ContextAwareMeter getIssuesFailedMeter;
@@ -76,12 +87,48 @@ public abstract class GaaSObservabilityEventProducer
implements Closeable {
this.metricContext = Instrumented.getMetricContext(state, getClass());
this.getIssuesFailedMeter =
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ISSUES_READ_FAILED_METRIC_NAME));
+ setupMetrics(state);
+ }
+ }
+
+ protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
+ return OpenTelemetryMetrics.getInstance(state);
+ }
+
+
+ private void setupMetrics(State state) {
+ this.opentelemetryMetrics = getOpentelemetryMetrics(state);
+ if (this.opentelemetryMetrics != null) {
+ this.jobStatusMetric =
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+ .gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
+ .ofLongs()
+ .buildObserver();
+
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
+ .batchCallback(() -> {
+ for (GaaSObservabilityEventExperimental event :
this.eventCollector) {
+ Attributes tags = getEventAttributes(event);
+ int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
+ this.jobStatusMetric.record(status, tags);
+ }
+ // Empty the list of events as they are all emitted at this point.
+ this.eventCollector.clear();
+ }, this.jobStatusMetric);
}
}
public void emitObservabilityEvent(final State jobState) {
GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
sendUnderlyingEvent(event);
+ this.eventCollector.add(event);
+ }
+
+ public Attributes getEventAttributes(GaaSObservabilityEventExperimental
event) {
+ Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
event.getFlowName())
+ .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
event.getFlowGroup())
+ .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
+ .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
+ .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId()).put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId()).build();
+ return tags;
}
/**
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
index 96383e6e4..eb29d3f94 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java
@@ -19,16 +19,23 @@ package org.apache.gobblin.service.monitoring;
import java.time.ZonedDateTime;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
-import org.testng.annotations.Test;
import org.testng.Assert;
+import org.testng.annotations.Test;
import com.google.common.collect.Maps;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
import org.apache.gobblin.metrics.JobStatus;
@@ -71,7 +78,7 @@ public class GaaSObservabilityProducerTest {
State state = new State();
state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
"testCluster");
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(state, this.issueRepository);
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(state, this.issueRepository, false);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
@@ -139,7 +146,7 @@ public class GaaSObservabilityProducerTest {
TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
- MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(new State(), this.issueRepository, false);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
@@ -178,6 +185,97 @@ public class GaaSObservabilityProducerTest {
serializer.serializeRecord(event);
}
+ @Test
+ public void testEnableMetrics() throws Exception {
+ String flowGroup = "testFlowGroup2";
+ String flowName = "testFlowName2";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
"http://localhost:5000");
+
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+
+ // Ensure that this doesn't throw due to NPE
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+ }
+
+ @Test
+ public void testMockProduceMetrics() throws Exception {
+ String flowGroup = "testFlowGroup2";
+ String flowName = "testFlowName2";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+
+ MockGaaSObservabilityEventProducer producer = new
MockGaaSObservabilityEventProducer(producerState, this.issueRepository, true);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
+
+ Map<String, String> gteEventMetadata2 = Maps.newHashMap();
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"2");
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ Properties jobStatusProps2 = new Properties();
+ jobStatusProps.putAll(gteEventMetadata); // Ensure that this doesn't
throw due to NPE
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+ jobStatusProps2.putAll(gteEventMetadata2);
+ producer.emitObservabilityEvent(new State(jobStatusProps2));
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ // Check number of meters
+ Assert.assertEquals(metrics.size(), 1);
+ Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(),
metricData -> metricData));
+ MetricData jobStatusMetric =
metricsByName.get("gaas.observability.jobStatus");
+ // Check the attributes of the metrics
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 2);
+ // Check that the values are different for the two events (order not
guaranteed for the same collection event)
+ Assert.assertNotEquals(datapoints.get(0).getValue(),
datapoints.get(1).getValue());
+
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
+
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
+
+ // Check common string tag
+
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+ }
+
private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {
return
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
index 770276677..eb9269535 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java
@@ -23,6 +23,8 @@ import java.util.List;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -33,10 +35,14 @@ import
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
private List<GaaSObservabilityEventExperimental> emittedEvents = new
ArrayList<>();
- public MockGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository) {
- super(state, issueRepository, false);
+ public MockGaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ super(state, issueRepository, instrumentationEnabled);
}
+ @Override
+ protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
+ return InMemoryOpenTelemetryMetrics.getInstance(state);
+ }
@Override
protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event)
{
emittedEvents.add(event);
@@ -50,4 +56,8 @@ public class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventPr
public List<GaaSObservabilityEventExperimental> getTestEmittedEvents() {
return Collections.unmodifiableList(this.emittedEvents);
}
+
+ public InMemoryOpenTelemetryMetrics getOpentelemetryMetrics() {
+ return (InMemoryOpenTelemetryMetrics) this.opentelemetryMetrics;
+ }
}
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 0aaed3729..dd103ab12 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -83,7 +83,8 @@ public class PropertiesUtils {
/** @throws {@link NullPointerException} when `key` not in `properties` */
public static String getRequiredPropRaw(Properties properties, String key,
Optional<String> desc) {
String value = properties.getProperty(key);
- Preconditions.checkNotNull(value, "'" + key + "' must be set" +
desc.transform(s -> " (to " + desc + ")").or(""));
+ Preconditions.checkNotNull(value, "'" + key + "' must be set" +
desc.transform
+ (s -> " (to " + desc + ")").or(""));
return value;
}
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index a2668660c..c3ff18b54 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,7 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("publishToMaven", false, "Enable publishing of
artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of
artifacts to Nexus"))
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce
dependencies version"))
-
+ .register(new BuildProperty("openTelemetryVersion", "1.29.0",
"OpenTelemetry dependencies version"))
task buildProperties(description: 'Lists main properties that can be used to
customize the build') {
doLast {
BUILD_PROPERTIES.printHelp();
@@ -73,5 +73,6 @@ BUILD_PROPERTIES.ensureDefined('kafka09Version')
BUILD_PROPERTIES.ensureDefined('kafka1Version')
BUILD_PROPERTIES.ensureDefined('pegasusVersion')
BUILD_PROPERTIES.ensureDefined('salesforceVersion')
+BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
ext.buildProperties = BUILD_PROPERTIES
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 4d962818d..cecba15fd 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -116,6 +116,10 @@ ext.externalDependency = [
"metricsJvm": "io.dropwizard.metrics:metrics-jvm:" +
dropwizardMetricsVersion,
"metricsGraphite": "io.dropwizard.metrics:metrics-graphite:" +
dropwizardMetricsVersion,
"metricsJmx": "io.dropwizard.metrics:metrics-jmx:" +
dropwizardMetricsVersion,
+ "opentelemetryApi": "io.opentelemetry:opentelemetry-api:" +
openTelemetryVersion,
+ "opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" +
openTelemetryVersion,
+ "opentelemetryExporterOtlp":
"io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
+ "opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" +
openTelemetryVersion,
"jsch": "com.jcraft:jsch:0.1.54",
"jdo2": "javax.jdo:jdo2-api:2.1",
"azkaban": "com.linkedin.azkaban:azkaban:2.5.0",