This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 663929cfed [GOBBLIN-2209] Emit GaaS Executor Otel Metrics (#4118)
663929cfed is described below
commit 663929cfed42466b491bd4b5b9739d9d8d8cf2ef
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Dec 17 08:21:03 2025 +0530
[GOBBLIN-2209] Emit GaaS Executor Otel Metrics (#4118)
---
.../gobblin/configuration/ConfigurationKeys.java | 12 +-
.../gobblin/metrics/OpenTelemetryMetrics.java | 26 +++-
.../opentelemetry/GobblinOpenTelemetryMetrics.java | 56 +++++++++
.../GobblinOpenTelemetryMetricsConstants.java | 40 ++++++
.../OpenTelemetryDoubleHistogram.java | 87 +++++++++++++
.../metrics/opentelemetry/OpenTelemetryHelper.java | 94 ++++++++++++++
.../OpenTelemetryInstrumentation.java | 138 +++++++++++++++++++++
.../opentelemetry/OpenTelemetryLongCounter.java | 87 +++++++++++++
.../metrics/opentelemetry/OpenTelemetryMetric.java | 40 ++++++
.../opentelemetry/OpenTelemetryMetricFactory.java | 56 +++++++++
.../opentelemetry/OpenTelemetryMetricType.java | 38 ++++++
.../opentelemetry/OpenTelemetryHelperTest.java | 72 +++++++++++
.../OpenTelemetryInstrumentationTest.java | 112 +++++++++++++++++
.../opentelemetry/OpenTelemetryMetricTest.java | 112 +++++++++++++++++
.../temporal/GobblinTemporalConfigurationKeys.java | 2 +
.../temporal/ddm/activity/ActivityType.java | 3 +
.../temporal/ddm/activity/EmitOTelMetrics.java | 55 ++++++++
.../ddm/activity/impl/EmitOTelMetricsImpl.java | 53 ++++++++
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 20 +++
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 3 +-
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 1 -
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 25 +++-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 33 +++++
.../temporal/ddm/activity/ActivityTypeTest.java | 1 +
gradle/scripts/defaultBuildProperties.gradle | 2 +-
25 files changed, 1159 insertions(+), 9 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3a70fe27cb..07215bf320 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -934,7 +934,10 @@ public class ConfigurationKeys {
// Opentelemetry based metrics reporting
public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX =
"metrics.reporting.opentelemetry.";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_CLASSNAME =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "className";
+ public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME
= "org.apache.gobblin.metrics.OpenTelemetryMetrics";
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
+ public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED
= false;
public static final String
METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_ENABLED =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.enabled";
@@ -943,14 +946,19 @@ public class ConfigurationKeys {
public static final String
METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_CLASSNAME =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.className";
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
- public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED
= false;
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC =
METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON
String with string keys and values
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
- public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "interval.millis";
+ public static final String
METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.buckets";
+ public static final String
METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.scale";
+ // A comma-separated list of dimensions to add to the OpenTelemetry metrics
+ public static final String METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "dimensions";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "group.name";
+ public static final String
DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME =
"org.apache.gobblin.metrics";
/**
* Rest server configuration properties.
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
index 72241b5dcc..c0efb59ff7 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -32,6 +32,11 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import
io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
@@ -52,8 +57,10 @@ import org.apache.gobblin.util.PropertiesUtils;
@Slf4j
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
- private static OpenTelemetryMetrics GLOBAL_INSTANCE;
+ private static volatile OpenTelemetryMetrics GLOBAL_INSTANCE;
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS =
10000L;
+ private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = 256;
+ private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = 3;
private OpenTelemetryMetrics(State state) {
super(state);
@@ -94,7 +101,12 @@ public class OpenTelemetryMetrics extends
OpenTelemetryMetricsBase {
public static OpenTelemetryMetrics getInstance(State state) {
if
(state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) &&
GLOBAL_INSTANCE == null) {
- GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+ synchronized (OpenTelemetryMetrics.class) {
+ if (GLOBAL_INSTANCE == null) {
+ log.info("Creating OpenTelemetryMetrics instance");
+ GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+ }
+ }
}
return GLOBAL_INSTANCE;
}
@@ -115,6 +127,13 @@ public class OpenTelemetryMetrics extends
OpenTelemetryMetricsBase {
}
metricsResource =
Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
}
+
+ Aggregation histogramAggregation =
Base2ExponentialHistogramAggregation.create(
+
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS,
+ DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS),
+
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE,
+ DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE));
+
SdkMeterProvider meterProvider = SdkMeterProvider.builder()
.setResource(metricsResource)
.registerMetricReader(
@@ -123,6 +142,9 @@ public class OpenTelemetryMetrics extends
OpenTelemetryMetricsBase {
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
.build())
+ .registerView(
+
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
+ View.builder().setAggregation(histogramAggregation).build())
.build();
this.openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
new file mode 100644
index 0000000000..c54f213e24
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+public enum GobblinOpenTelemetryMetrics {
+ /**
+ * Metric to track the count of Gobblin Jobs for each of its state
(GenerateWorkUnit, ProcessWorkUnit, CommitStep).
+ * Metric Unit: 1 represents each increment will add one data point to the
counter.
+ * */
+ GOBBLIN_JOB_STATE("gobblin.job.state", "Gobblin job state counter", "1",
OpenTelemetryMetricType.LONG_COUNTER),
+
+ /**
+ * Metric to track the latency of each Gobblin Job state (GenerateWorkUnit,
ProcessWorkUnit, CommitStep).
+ * Metric Unit: seconds (s) represents the time taken for each state.
+ * */
+ GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
+
+ private final String metricName;
+ private final String metricDescription;
+ private final String metricUnit;
+ private final OpenTelemetryMetricType metricType;
+
+ @SuppressWarnings("unchecked")
+ public <T extends OpenTelemetryMetric> T createMetric(Attributes attributes,
Meter meter) {
+ return (T) this.metricType.getFactory().newMetric(this.metricName,
this.metricDescription, this.metricUnit, attributes, meter);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Metric{name='%s', description='%s', unit='%s',
type=%s}", metricName, metricDescription, metricUnit, metricType);
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
new file mode 100644
index 0000000000..d998380a54
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+public class GobblinOpenTelemetryMetricsConstants {
+
+ public static class DimensionKeys {
+ public static final String STATE = "state";
+ public static final String CURR_STATE = "currState";
+ }
+
+ public static class DimensionValues {
+ public static final String GENERATE_WU = "generateWU";
+ public static final String PROCESS_WU = "processWU";
+ public static final String COMMIT_STEP = "commitStep";
+ public static final String JOB_START = "jobStart";
+ public static final String JOB_COMPLETE = "jobComplete";
+ public static final String GENERATE_WU_START = "generateWUStart";
+ public static final String GENERATE_WU_COMPLETE = "generateWUComplete";
+ public static final String PROCESS_WU_START = "processWUStart";
+ public static final String PROCESS_WU_COMPLETE = "processWUComplete";
+ public static final String COMMIT_STEP_START = "commitStepStart";
+ public static final String COMMIT_STEP_COMPLETE = "commitStepComplete";
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
new file mode 100644
index 0000000000..e5fb3b13ba
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry
{@link DoubleHistogram}.
+ *
+ * <p>This class provides a histogram for recording the distribution of double
values.
+ * It supports recording values with optional additional attributes that can
be merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric {
+ private String name;
+ private Attributes baseAttributes;
+ private DoubleHistogram doubleHistogram;
+
+ /**
+ * Records the specified value in the histogram with the base attributes.
+ *
+ * @param value the double value to record in the histogram
+ */
+ public void record(double value) {
+ log.debug("Emitting double histogram metric: {}, value: {}, attributes:
{}", this.name, value, this.baseAttributes);
+ this.doubleHistogram.record(value, this.baseAttributes);
+ }
+
+ /**
+ * Records the specified value in the histogram with a combination of base
attributes and additional attributes.
+ *
+ * @param value the double value to record in the histogram
+ * @param additionalAttributes the additional attributes to be merged with
base attributes
+ */
+ public void record(double value, Attributes additionalAttributes) {
+ log.debug("Emitting double histogram metric: {}, value: {}, base
attributes: {}, additional attributes: {}",
+ this.name, value, this.baseAttributes, additionalAttributes);
+ this.doubleHistogram.record(value,
OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getMetricName() {
+ return this.name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OpenTelemetryMetricType getMetricType() {
+ return OpenTelemetryMetricType.DOUBLE_HISTOGRAM;
+ }
+
+ /**
+ * Returns a string representation of this histogram with its name.
+ */
+ @Override
+ public String toString() {
+ return "OpenTelemetryDoubleHistogram{name='" + name + "'}";
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
new file mode 100644
index 0000000000..b3e3d30b85
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.Map;
+
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+/**
+ * Utility class for OpenTelemetry related operations.
+ *
+ * <p>Provides methods to handle OpenTelemetry attributes, including merging
multiple
+ * {@link Attributes} instances and converting maps to {@link Attributes}.
+ */
+@UtilityClass
+public class OpenTelemetryHelper {
+
+ private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE =
"UNKNOWN";
+
+ /**
+ * Returns the provided attribute value when it is non-null and non-empty;
+ * otherwise returns the default OpenTelemetry attribute placeholder.
+ *
+ * @param value candidate attribute value to check
+ * @return the original value if not empty, or
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
+ */
+ public static String getOrDefaultOpenTelemetryAttrValue(String value) {
+ return StringUtils.defaultIfBlank(value,
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+ }
+
+ /**
+ * Merges multiple {@link Attributes} instances into a single {@link
Attributes}.
+ *
+ * <p>Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are
ignored.
+ * The resulting {@link Attributes} contains all key-value pairs from the
+ * provided non-null, non-empty inputs in the order they are given.
+ * For duplicate keys, the last occurrence in the array will take precedence.
+ *
+ * @param attributesArray array of {@link Attributes} to merge; may contain
{@code null} or empty entries
+ * @return a new {@link Attributes} instance containing all entries from the
non-null,
+ * non-empty inputs; never {@code null}
+ */
+ public static Attributes mergeAttributes(Attributes... attributesArray) {
+ AttributesBuilder builder = Attributes.builder();
+ for (Attributes attrs : attributesArray) {
+ if (attrs != null && !attrs.isEmpty()) {
+ builder.putAll(attrs);
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Converts a map of string attributes to an OpenTelemetry {@link
Attributes} instance.
+ *
+ * <p>Each entry in the map is converted to an OpenTelemetry attribute, using
+ * {@link #getOrDefaultOpenTelemetryAttrValue(String)} to handle empty
values.
+ *
+ * @param attributes map of string attributes to convert; may be {@code null}
+ * @return a new {@link Attributes} instance containing the converted
attributes;
+ * never {@code null}
+ */
+ public static Attributes toOpenTelemetryAttributes(Map<String, String>
attributes) {
+ AttributesBuilder builder = Attributes.builder();
+ if (attributes != null) {
+ for (Map.Entry<String, String> entry : attributes.entrySet()) {
+ String key = entry.getKey();
+ String value = getOrDefaultOpenTelemetryAttrValue(entry.getValue());
+ builder.put(key, value);
+ }
+ }
+ return builder.build();
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
new file mode 100644
index 0000000000..26eae56819
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import com.google.common.base.Splitter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Provides OpenTelemetry instrumentation for metrics.
+ *
+ * <p>Maintains a singleton instance that holds common attributes {@link
Attributes} and a Meter {@link Meter}.
+ * Exposes methods to retrieve or create metric instruments defined in {@link
GobblinOpenTelemetryMetrics}.
+ */
+@Slf4j
+@Getter
+public class OpenTelemetryInstrumentation {
+
+ // Adding the gobblin-service.main
(BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR) dependency is creating
circular dependency issues
+ private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+ private static final Splitter COMMA_SPLITTER =
Splitter.on(',').omitEmptyStrings().trimResults();
+ private static volatile OpenTelemetryInstrumentation GLOBAL_INSTANCE;
+
+ private final Attributes commonAttributes;
+ private final Meter meter;
+ private final ConcurrentHashMap<String, OpenTelemetryMetric> metrics;
+
+ private OpenTelemetryInstrumentation(final State state) {
+ this.commonAttributes = buildCommonAttributes(state);
+ this.meter = getOpenTelemetryMetrics(state).getMeter(state.getProp(
+ ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME));
+ this.metrics = new ConcurrentHashMap<>();
+ }
+
+ private OpenTelemetryMetricsBase getOpenTelemetryMetrics(State state) {
+ try {
+ String openTelemetryClassName =
state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME);
+ Class<?> metricsClass = Class.forName(openTelemetryClassName);
+ Method getInstanceMethod = metricsClass.getMethod("getInstance",
State.class);
+ return (OpenTelemetryMetricsBase) getInstanceMethod.invoke(null, state);
+ } catch (Exception e) {
+ log.error("Failed to initialize OpenTelemetryMetrics through reflection,
defaulting to direct instantiation of InMemoryOpenTelemetryMetrics", e);
+ }
+ return InMemoryOpenTelemetryMetrics.getInstance(state);
+ }
+
+ /**
+ * Returns the singleton instance for the given configuration state.
+ *
+ * @param state the configuration containing metric reporting and dimension
configs
+ * @return the global {@link OpenTelemetryInstrumentation} instance
+ */
+ public static OpenTelemetryInstrumentation getInstance(final State state) {
+ if (GLOBAL_INSTANCE == null) {
+ synchronized (OpenTelemetryInstrumentation.class) {
+ if (GLOBAL_INSTANCE == null) {
+ log.info("Creating OpenTelemetryInstrumentation instance");
+ GLOBAL_INSTANCE = new OpenTelemetryInstrumentation(state);
+ }
+ }
+ }
+ return GLOBAL_INSTANCE;
+ }
+
+ public static OpenTelemetryInstrumentation getInstance(final Properties
props) {
+ return getInstance(new State(props));
+ }
+
+ /**
+ * Retrieves an existing metric by its enum definition or creates it if
absent.
+ *
+ * @param metric the {@link GobblinOpenTelemetryMetrics} enum defining name,
description, unit, and type {@link OpenTelemetryMetricType}
+ * @return an {@link OpenTelemetryMetric} instance corresponding to the
provided enum
+ */
+ @SuppressWarnings("unchecked")
+ public <T extends OpenTelemetryMetric> T
getOrCreate(GobblinOpenTelemetryMetrics metric) {
+ return (T) this.metrics.computeIfAbsent(metric.getMetricName(), name ->
metric.createMetric(this.commonAttributes, this.meter));
+ }
+
+ private Attributes buildCommonAttributes(final State state) {
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ String commonDimensions =
state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+ if (StringUtils.isNotEmpty(commonDimensions)) {
+ for (String dimension : COMMA_SPLITTER.split(commonDimensions)) {
+ String dimensionKey = dimension.trim();
+ String dimensionValue = state.getProp(dimensionKey, "");
+ if (ConfigurationKeys.FLOW_EDGE_ID_KEY.equals(dimensionKey)) {
+ dimensionValue = getFlowEdgeId(state, dimensionValue);
+ }
+ if (StringUtils.isNotEmpty(dimensionValue)) {
+ attributesBuilder.put(dimensionKey,
OpenTelemetryHelper.getOrDefaultOpenTelemetryAttrValue(dimensionValue));
+ }
+ }
+ }
+ return attributesBuilder.build();
+ }
+
+ private static String getFlowEdgeId(final State state, String
fullFlowEdgeId) {
+ // Parse the flowEdgeId from fullFlowEdgeId that is stored in format
sourceNode_destinationNode_flowEdgeId
+ return StringUtils.substringAfter(
+ StringUtils.substringAfter(fullFlowEdgeId,
state.getProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "")),
+ FLOW_EDGE_LABEL_JOINER_CHAR);
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
new file mode 100644
index 0000000000..4d06a6af90
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry
{@link LongCounter}.
+ *
+ * <p>This class provides a counter for recording values.
+ * It supports adding values with optional additional attributes that can be
merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryLongCounter implements OpenTelemetryMetric {
+ private String name;
+ private Attributes baseAttributes;
+ private LongCounter longCounter;
+
+ /**
+ * Adds the specified value to the counter with the base attributes.
+ *
+ * @param value the value to add to the counter
+ */
+ public void add(long value) {
+ log.debug("Emitting long counter metric: {}, value: {}, attributes: {}",
this.name, value, this.baseAttributes);
+ this.longCounter.add(value, this.baseAttributes);
+ }
+
+ /**
+ * Adds the specified value to the counter with a combination of base
attributes and additional attributes.
+ *
+ * @param value the value to add to the counter
+ * @param additionalAttributes the additional attributes to be merged with
base attributes
+ */
+ public void add(long value, Attributes additionalAttributes) {
+ log.debug("Emitting long counter metric: {}, value: {}, base attributes:
{}, additional attributes: {}",
+ this.name, value, this.baseAttributes, additionalAttributes);
+ this.longCounter.add(value,
OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getMetricName() {
+ return this.name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OpenTelemetryMetricType getMetricType() {
+ return OpenTelemetryMetricType.LONG_COUNTER;
+ }
+
+ /**
+ * Returns a string representation of this counter with its name.
+ */
+ @Override
+ public String toString() {
+ return "OpenTelemetryLongCounter{name='" + name + "'}";
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
new file mode 100644
index 0000000000..48f27f17a8
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+/**
+ * Interface representing a metric in the OpenTelemetry system.
+ * It provides methods to retrieve the metric name and type.
+ */
+public interface OpenTelemetryMetric {
+
+ /**
+ * Returns the name of the metric.
+ *
+ * @return the metric name as a {@link String}
+ */
+ String getMetricName();
+
+ /**
+ * Returns the {@link OpenTelemetryMetricType} indicating the kind of metric
instrument.
+ *
+ * @return the metric type {@link OpenTelemetryMetricType}
+ */
+ OpenTelemetryMetricType getMetricType();
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
new file mode 100644
index 0000000000..30b2da207c
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+
+/**
+ * Factory interface for creating OpenTelemetry metrics of type {@link T}.
+ *
+ * @param <T> the type of OpenTelemetry metric to be created
+ */
+public interface OpenTelemetryMetricFactory<T extends OpenTelemetryMetric> {
+
+ OpenTelemetryMetricFactory<OpenTelemetryLongCounter> LONG_COUNTER_FACTORY =
new OpenTelemetryLongCounterFactory();
+ OpenTelemetryMetricFactory<OpenTelemetryDoubleHistogram>
DOUBLE_HISTOGRAM_FACTORY = new OpenTelemetryDoubleHistogramFactory();
+
+ T newMetric(String name, String description, String unit, Attributes
attributes, Meter meter);
+
+ /** Factory class for creating {@link OpenTelemetryLongCounter} metrics. */
+ class OpenTelemetryLongCounterFactory implements
OpenTelemetryMetricFactory<OpenTelemetryLongCounter> {
+
+ @Override
+ public OpenTelemetryLongCounter newMetric(String name, String description,
String unit, Attributes attributes, Meter meter) {
+ return new OpenTelemetryLongCounter(name, attributes,
+
meter.counterBuilder(name).setDescription(description).setUnit(unit).build());
+ }
+
+ }
+
+ /** Factory class for creating {@link OpenTelemetryDoubleHistogram} metrics.
*/
+ class OpenTelemetryDoubleHistogramFactory implements
OpenTelemetryMetricFactory<OpenTelemetryDoubleHistogram> {
+
+ @Override
+ public OpenTelemetryDoubleHistogram newMetric(String name, String
description, String unit, Attributes attributes, Meter meter) {
+ return new OpenTelemetryDoubleHistogram(name, attributes,
+
meter.histogramBuilder(name).setDescription(description).setUnit(unit).build());
+ }
+
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
new file mode 100644
index 0000000000..8890d37d90
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.Getter;
+
+/**
+ * Enum representing the types of OpenTelemetry metrics supported.
+ */
+@Getter
+public enum OpenTelemetryMetricType {
+ /** Represents a metric of type LongCounter. */
+ LONG_COUNTER(OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY),
+ /** Represents a metric of type DoubleHistogram. */
+ DOUBLE_HISTOGRAM(OpenTelemetryMetricFactory.DOUBLE_HISTOGRAM_FACTORY);
+
+ private final OpenTelemetryMetricFactory<? extends OpenTelemetryMetric>
factory;
+
+ OpenTelemetryMetricType(OpenTelemetryMetricFactory<? extends
OpenTelemetryMetric> factory) {
+ this.factory = factory;
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
new file mode 100644
index 0000000000..b1fef64019
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+
+/**
+ * Tests for {@link OpenTelemetryHelper} class.
+ */
+public class OpenTelemetryHelperTest {
+
+ @Test
+ public void testMergeAttributes() {
+ Attributes emptyAttributes = Attributes.empty();
+ Attributes attributes1 = Attributes.builder().put("key1",
"value1").build();
+ Attributes attributes2 = Attributes.builder().put("key2",
"value2").build();
+
+ Attributes attributes = OpenTelemetryHelper.mergeAttributes(null,
emptyAttributes);
+ Assert.assertEquals(attributes.size(), 0);
+ Assert.assertEquals(attributes, emptyAttributes);
+
+ attributes = OpenTelemetryHelper.mergeAttributes(attributes1, attributes2);
+ Assert.assertEquals(attributes.size(), 2);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")),
"value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")),
"value2");
+
+ attributes = OpenTelemetryHelper.mergeAttributes(attributes1,
emptyAttributes);
+ Assert.assertEquals(attributes.size(), 1);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")),
"value1");
+ Assert.assertNull(attributes.get(AttributeKey.stringKey("key2")));
+
+ }
+
+ @Test
+ public void testToOpenTelemetryAttributes() {
+ Attributes attributes =
OpenTelemetryHelper.toOpenTelemetryAttributes(null);
+ Assert.assertEquals(attributes.size(), 0);
+
+ Map<String, String> map = new HashMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+
+ attributes = OpenTelemetryHelper.toOpenTelemetryAttributes(map);
+ Assert.assertEquals(attributes.size(), 2);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")),
"value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")),
"value2");
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
new file mode 100644
index 0000000000..74c7f57fbf
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Field;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Unit tests for {@link OpenTelemetryInstrumentation}.
+ * These tests ensure that the singleton instance is created correctly,
+ * common attributes are built from dimensions, and metrics are created and
cached properly.
+ */
+public class OpenTelemetryInstrumentationTest {
+
+ private OpenTelemetryInstrumentation instrumentation;
+ private State state;
+
+ @BeforeMethod
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ // Reset singleton instance before each test
+ Field instanceField =
OpenTelemetryInstrumentation.class.getDeclaredField("GLOBAL_INSTANCE");
+ instanceField.setAccessible(true);
+ instanceField.set(null, null);
+
+ state = new State();
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME,
"org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics");
+ }
+
+ @Test
+ public void singletonInstanceCreatedOnce() {
+ OpenTelemetryInstrumentation instance1 =
OpenTelemetryInstrumentation.getInstance(state);
+ OpenTelemetryInstrumentation instance2 =
OpenTelemetryInstrumentation.getInstance(state);
+
+ Assert.assertSame(instance1, instance2, "getInstance should return the
same instance");
+ }
+
+ @Test
+ public void emptyDimensionsCreateEmptyAttributes() {
+
state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+ Assert.assertEquals(attributes.size(), 0);
+ }
+
+ @Test
+ public void commonAttributesBuiltFromDimensions() {
+
state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS,
"dim1,dim2");
+ state.setProp("dim1", "value1");
+ state.setProp("dim2", "value2");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim1")),
"value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim2")),
"value2");
+ }
+
+ @Test
+ public void flowEdgeIdParsedCorrectly() {
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS,
+ ConfigurationKeys.FLOW_EDGE_ID_KEY);
+ state.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
"destNode");
+ state.setProp(ConfigurationKeys.FLOW_EDGE_ID_KEY,
"sourceNode_destNode_edgeId");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+
Assert.assertEquals(attributes.get(AttributeKey.stringKey(ConfigurationKeys.FLOW_EDGE_ID_KEY)),
+ "edgeId");
+ }
+
+ @Test
+ public void metricsAreCreatedAndCached() {
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 0, "Metrics map
should be empty initially");
+ instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map
should contain one metric after creation");
+ instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map
should still contain one metric after duplicate creation");
+
instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 2, "Metrics map
should contain two metrics after creating another");
+ }
+
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
new file mode 100644
index 0000000000..580878a357
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+
+/** Tests for OpenTelemetry metrics implementation, & its associated factory
classes {@link OpenTelemetryMetricFactory}
+ * specifically for {@link OpenTelemetryLongCounter} and {@link
OpenTelemetryDoubleHistogram}.
+ * These tests validate the object creation through factory classes {@link
OpenTelemetryMetricFactory#LONG_COUNTER_FACTORY}
+ * & {@link OpenTelemetryMetricFactory#DOUBLE_HISTOGRAM_FACTORY} ,correct
recording of metrics with and without
+ * additional attributes.
+ */
+public class OpenTelemetryMetricTest {
+
+ private InMemoryOpenTelemetryMetrics inMemoryOpenTelemetryMetrics;
+ private final String testMeterGroupName = "testMeterGroup";
+ private final Attributes baseAttributes = Attributes.builder()
+ .put("dim1", "val1")
+ .put("dim2", "val2")
+ .build();
+
+ @BeforeMethod
+ public void setUp() {
+ inMemoryOpenTelemetryMetrics =
InMemoryOpenTelemetryMetrics.getInstance(new State());
+ }
+
+ @Test
+ public void testOpenTelemetryLongCounter() {
+ String metricName = "testLongCounter";
+ OpenTelemetryLongCounter longCounter =
OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY.newMetric(metricName,
+ "testLongCounterDescription", "1", baseAttributes,
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+ longCounter.add(20, Attributes.builder().put("dim3", "val3").build());
+ Collection<MetricData> metrics =
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData ->
metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List<LongPointData> dataPoints = new
ArrayList<>(metricData.getLongSumData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getValue(), 20);
+
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")),
"val1");
+
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")),
"val2");
+
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")),
"val3");
+ }
+
+ @Test
+ public void testOpenTelemetryLongCounterWithoutAdditionalAttributes() {
+ String metricName = "testLongCounterWithoutAdditionalAttributes";
+ OpenTelemetryLongCounter longCounter =
OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY.newMetric(metricName,
+ "testLongCounterDescription", "1", baseAttributes,
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+ longCounter.add(10);
+ Collection<MetricData> metrics =
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData ->
metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List<LongPointData> dataPoints = new
ArrayList<>(metricData.getLongSumData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getValue(), 10);
+
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")),
"val1");
+
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")),
"val2");
+
Assert.assertNull(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")),
+ "Additional attribute dim3 should not be present when not provided");
+ }
+
+ @Test
+ public void testOpenTelemetryDoubleHistogram() {
+ String metricName = "testDoubleHistogram";
+ OpenTelemetryDoubleHistogram doubleHistogram =
OpenTelemetryMetricFactory.DOUBLE_HISTOGRAM_FACTORY.newMetric(metricName,
+ "testDoubleHistogramDescription", "s", baseAttributes,
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+ doubleHistogram.record(5.0, Attributes.builder().put("dim3",
"val3").build());
+ doubleHistogram.record(10.0, Attributes.builder().put("dim3",
"val3").build());
+ Collection<MetricData> metrics =
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData ->
metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List<HistogramPointData> dataPoints = new
ArrayList<>(metricData.getHistogramData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getSum(), 15.0, "Sum should match
recorded value");
+ }
+
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 94819c9c85..0b95b78ef4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -124,6 +124,8 @@ public interface GobblinTemporalConfigurationKeys {
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "emit.otel.metrics." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options.";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
index 22126b2d0f..0860fa82b0 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -52,6 +52,9 @@ public enum ActivityType {
/** Activity type for submitting GTE. */
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+ /** Activity type for emitting open telemetry metrics */
+
EMIT_OTEL_METRICS(GobblinTemporalConfigurationKeys.TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
/** Default placeholder activity type. */
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
new file mode 100644
index 0000000000..7b32a7b873
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.Map;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+
+/**
+ * Temporal activity interface for emitting OpenTelemetry metrics.
+ */
+@ActivityInterface
+public interface EmitOTelMetrics {
+
+ /**
+ * Emits a long counter metric.
+ *
+ * @param metric The OpenTelemetry metric to emit.
+ * @param value The value to add to the counter.
+ * @param attributes Additional attributes for the metric.
+ * @param jobProps Properties related to the job context.
+ */
+ @ActivityMethod
+ void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value,
Map<String, String> attributes, Properties jobProps);
+
+ /**
+ * Emits a double histogram metric.
+ *
+ * @param metric The OpenTelemetry metric to emit.
+ * @param value The value to record in the histogram.
+ * @param attributes Additional attributes for the metric.
+ * @param jobProps Properties related to the job context.
+ */
+ @ActivityMethod
+ void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double
value, Map<String, String> attributes, Properties jobProps);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
new file mode 100644
index 0000000000..644aafde6f
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryLongCounter;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryMetricType;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
+
+/**
+ * Implementation of {@link EmitOTelMetrics} that emits OpenTelemetry metrics.
+ */
+public class EmitOTelMetricsImpl implements EmitOTelMetrics {
+
+ @Override
+ public void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long
value, Map<String, String> attributes, Properties jobProps) {
+ if (!metric.getMetricType().equals(OpenTelemetryMetricType.LONG_COUNTER)) {
+ throw new IllegalArgumentException("Metric " + metric.getMetricName() +
" is not of type LONG_COUNTER");
+ }
+ OpenTelemetryLongCounter longCounter =
OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+ longCounter.add(value,
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+ }
+
+ @Override
+ public void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric,
double value, Map<String, String> attributes, Properties jobProps) {
+ if
(!metric.getMetricType().equals(OpenTelemetryMetricType.DOUBLE_HISTOGRAM)) {
+ throw new IllegalArgumentException("Metric " + metric.getMetricName() +
" is not of type DOUBLE_HISTOGRAM");
+ }
+ OpenTelemetryDoubleHistogram doubleHistogram =
OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+ doubleHistogram.record(value,
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index b2f65ccb41..ad61e7b99d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -17,7 +17,9 @@
package org.apache.gobblin.temporal.ddm.launcher;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,12 +35,14 @@ import
org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -50,6 +54,9 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
/**
* A {@link JobLauncher} for the initial triggering of a Temporal workflow
that executes a full Gobblin job workflow of:
@@ -96,8 +103,21 @@ public class ExecuteGobblinJobLauncher extends
GobblinTemporalJobLauncher {
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(finalProps)
.build();
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CURR_STATE, JOB_START);
+ EmitOTelMetricsImpl emitOTelMetrics = new EmitOTelMetricsImpl();
+
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, finalProps);
+
+ long startTimeMillis = System.currentTimeMillis();
ExecGobblinStats execGobblinStats = workflow.execute(finalProps,
eventSubmitterContext);
+ double timeTaken = (System.currentTimeMillis() - startTimeMillis) /
1000.0;
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}",
execGobblinStats);
+ attributes.put(CURR_STATE, JOB_COMPLETE);
+
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, finalProps);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, JOB_COMPLETE);
+
emitOTelMetrics.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
timeTaken, attributes, finalProps);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 12fe6c4d84..3ee45d20eb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -28,6 +28,7 @@ import
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import
org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
@@ -60,7 +61,7 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Object[] getActivityImplInstances() {
return new Object[] { new SubmitGTEActivityImpl(), new
GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(),
new ProcessWorkUnitImpl(),
- new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
+ new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(), new
EmitOTelMetricsImpl()};
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 50d587c4ef..f14788acf6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -23,7 +23,6 @@ import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 05fa2a8546..3e08c7536b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -46,9 +48,11 @@ import io.temporal.workflow.Workflow;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
@@ -75,6 +79,9 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
@Slf4j
public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
@@ -83,11 +90,16 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
// Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
- final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
-
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+ final Properties temporalJobProps = PropertiesUtils.combineProperties(
+ PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)),
+ PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_PREFIX))
+ );
// Add File system properties to the temporal job properties
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
+ Map<String, String> attributes = new HashMap<>();
+ final EmitOTelMetrics emitOTelMetricsActivityStub =
Workflow.newActivityStub(EmitOTelMetrics.class,
+ ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(temporalJobProps,
false));
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext,
temporalJobProps);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -97,7 +109,16 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
try (Closer closer = Closer.create()) {
final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps, true));
+ attributes.put(CURR_STATE, GENERATE_WU_START);
+
emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1, attributes, temporalJobProps);
+ long genWUStartTime = Workflow.currentTimeMillis();
GenerateWorkUnitsResult generateWorkUnitResult =
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ attributes.put(CURR_STATE, GENERATE_WU_COMPLETE);
+
emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1, attributes, temporalJobProps);
+ double genWUTImeTaken = (Workflow.currentTimeMillis() - genWUStartTime)
/ 1000.0;
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, GENERATE_WU);
+
emitOTelMetricsActivityStub.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
genWUTImeTaken, attributes, temporalJobProps);
optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResult.getWorkUnitsSizeSummary();
int numWUsGenerated =
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 30db5a79b2..94e870c283 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -29,9 +30,12 @@ import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
@@ -50,15 +54,22 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.util.PropertiesUtils;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
public static final String COMMIT_STEP_WORKFLOW_ID_BASE =
"CommitStepWorkflow";
+ private EmitOTelMetrics emitOTelMetricsActivityStub;
+
@Override
public CommitStats process(WUProcessingSpec workSpec, final Properties
props) {
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec, props);
+ this.emitOTelMetricsActivityStub =
Workflow.newActivityStub(EmitOTelMetrics.class,
+ ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(props, false));
CommitStats result = performWork(workSpec, props);
timer.ifPresent(EventTimer::stop);
return result;
@@ -75,7 +86,18 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
performWorkloadInput = new
NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
Optional.empty(), props);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CURR_STATE, PROCESS_WU_START);
+
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, props);
+ long processWUStartTime = Workflow.currentTimeMillis();
workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(performWorkloadInput));
+ attributes.put(CURR_STATE, PROCESS_WU_COMPLETE);
+
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, props);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, PROCESS_WU);
+ double processWUDuration = (Workflow.currentTimeMillis() -
processWUStartTime) / 1000.0;
+ this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+ GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
processWUDuration, attributes, props);
} catch (Exception e) {
log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
@@ -115,7 +137,18 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
return CommitStats.createEmpty();
}
CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CURR_STATE, COMMIT_STEP_START);
+
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, props);
+ long commitStepStartTime = Workflow.currentTimeMillis();
CommitStats result = commitWorkflow.commit(workSpec, props);
+ attributes.put(CURR_STATE, COMMIT_STEP_COMPLETE);
+
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
1L, attributes, props);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, COMMIT_STEP);
+ double commitStepDuration = (Workflow.currentTimeMillis() -
commitStepStartTime) / 1000.0;
+ this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+ GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
commitStepDuration, attributes, props);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been
committed at the task level.");
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
index 8ce5c2991d..6e2a54fb3d 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -70,6 +70,7 @@ public class ActivityTypeTest {
{ActivityType.PROCESS_WORKUNIT, 555},
{ActivityType.COMMIT, 444},
{ActivityType.SUBMIT_GTE, 999},
+ {ActivityType.EMIT_OTEL_METRICS, 888},
{ActivityType.DEFAULT_ACTIVITY, 1}
};
}
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index 2263dcdfa0..f0207d9ba4 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,7 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("publishToMaven", false, "Enable publishing of
artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of
artifacts to Nexus"))
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce
dependencies version"))
- .register(new BuildProperty("openTelemetryVersion", "1.30.0",
"OpenTelemetry dependencies version"))
+ .register(new BuildProperty("openTelemetryVersion", "1.47.0",
"OpenTelemetry dependencies version"))
.register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer
dependencies version"))
task buildProperties(description: 'Lists main properties that can be used to
customize the build') {
doLast {