This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 663929cfed [GOBBLIN-2209] Emit GaaS Executor Otel Metrics (#4118)
663929cfed is described below

commit 663929cfed42466b491bd4b5b9739d9d8d8cf2ef
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Dec 17 08:21:03 2025 +0530

    [GOBBLIN-2209] Emit GaaS Executor Otel Metrics (#4118)
---
 .../gobblin/configuration/ConfigurationKeys.java   |  12 +-
 .../gobblin/metrics/OpenTelemetryMetrics.java      |  26 +++-
 .../opentelemetry/GobblinOpenTelemetryMetrics.java |  56 +++++++++
 .../GobblinOpenTelemetryMetricsConstants.java      |  40 ++++++
 .../OpenTelemetryDoubleHistogram.java              |  87 +++++++++++++
 .../metrics/opentelemetry/OpenTelemetryHelper.java |  94 ++++++++++++++
 .../OpenTelemetryInstrumentation.java              | 138 +++++++++++++++++++++
 .../opentelemetry/OpenTelemetryLongCounter.java    |  87 +++++++++++++
 .../metrics/opentelemetry/OpenTelemetryMetric.java |  40 ++++++
 .../opentelemetry/OpenTelemetryMetricFactory.java  |  56 +++++++++
 .../opentelemetry/OpenTelemetryMetricType.java     |  38 ++++++
 .../opentelemetry/OpenTelemetryHelperTest.java     |  72 +++++++++++
 .../OpenTelemetryInstrumentationTest.java          | 112 +++++++++++++++++
 .../opentelemetry/OpenTelemetryMetricTest.java     | 112 +++++++++++++++++
 .../temporal/GobblinTemporalConfigurationKeys.java |   2 +
 .../temporal/ddm/activity/ActivityType.java        |   3 +
 .../temporal/ddm/activity/EmitOTelMetrics.java     |  55 ++++++++
 .../ddm/activity/impl/EmitOTelMetricsImpl.java     |  53 ++++++++
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |  20 +++
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   3 +-
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |   1 -
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  25 +++-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  33 +++++
 .../temporal/ddm/activity/ActivityTypeTest.java    |   1 +
 gradle/scripts/defaultBuildProperties.gradle       |   2 +-
 25 files changed, 1159 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3a70fe27cb..07215bf320 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -934,7 +934,10 @@ public class ConfigurationKeys {
 
   // Opentelemetry based metrics reporting
   public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = 
"metrics.reporting.opentelemetry.";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_CLASSNAME = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "className";
+  public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME 
= "org.apache.gobblin.metrics.OpenTelemetryMetrics";
   public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
+  public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED 
= false;
 
   public static final String 
METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_ENABLED = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.enabled";
 
@@ -943,14 +946,19 @@ public class ConfigurationKeys {
   public static final String 
METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_CLASSNAME = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.className";
 
   public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
-  public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED 
= false;
 
   public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
   public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC = 
METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
   // Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON 
String with string keys and values
   public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
 
-  public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = 
METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "interval.millis";
+  public static final String 
METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.buckets";
+  public static final String 
METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.scale";
+  // A comma-separated list of dimensions to add to the OpenTelemetry metrics
+  public static final String METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "dimensions";
+  public static final String METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "group.name";
+  public static final String 
DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = 
"org.apache.gobblin.metrics";
 
   /**
    * Rest server configuration properties.
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
index 72241b5dcc..c0efb59ff7 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -32,6 +32,11 @@ import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
 import 
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import 
io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation;
 import io.opentelemetry.sdk.metrics.SdkMeterProvider;
 import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
 import io.opentelemetry.sdk.metrics.export.MetricExporter;
@@ -52,8 +57,10 @@ import org.apache.gobblin.util.PropertiesUtils;
 @Slf4j
 public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
 
-  private static OpenTelemetryMetrics GLOBAL_INSTANCE;
+  private static volatile OpenTelemetryMetrics GLOBAL_INSTANCE;
   private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 
10000L;
+  private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = 256;
+  private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = 3;
 
   private OpenTelemetryMetrics(State state) {
     super(state);
@@ -94,7 +101,12 @@ public class OpenTelemetryMetrics extends 
OpenTelemetryMetricsBase {
   public static OpenTelemetryMetrics getInstance(State state) {
     if 
(state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) && 
GLOBAL_INSTANCE == null) {
-      GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+      synchronized (OpenTelemetryMetrics.class) {
+        if (GLOBAL_INSTANCE == null) {
+          log.info("Creating OpenTelemetryMetrics instance");
+          GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+        }
+      }
     }
     return GLOBAL_INSTANCE;
   }
@@ -115,6 +127,13 @@ public class OpenTelemetryMetrics extends 
OpenTelemetryMetricsBase {
       }
       metricsResource = 
Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
     }
+
+    Aggregation histogramAggregation = 
Base2ExponentialHistogramAggregation.create(
+        
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS,
+            DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS),
+        
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE,
+            DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE));
+
     SdkMeterProvider meterProvider = SdkMeterProvider.builder()
         .setResource(metricsResource)
         .registerMetricReader(
@@ -123,6 +142,9 @@ public class OpenTelemetryMetrics extends 
OpenTelemetryMetricsBase {
                     
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
                         DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
                 .build())
+        .registerView(
+            
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
+            View.builder().setAggregation(histogramAggregation).build())
         .build();
 
     this.openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
new file mode 100644
index 0000000000..c54f213e24
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+public enum GobblinOpenTelemetryMetrics {
+  /**
+   * Metric to track the count of Gobblin Jobs for each of its state 
(GenerateWorkUnit, ProcessWorkUnit, CommitStep).
+   * Metric Unit: 1 represents each increment will add one data point to the 
counter.
+   * */
+  GOBBLIN_JOB_STATE("gobblin.job.state", "Gobblin job state counter", "1", 
OpenTelemetryMetricType.LONG_COUNTER),
+
+  /**
+   * Metric to track the latency of each Gobblin Job state (GenerateWorkUnit, 
ProcessWorkUnit, CommitStep).
+   * Metric Unit: seconds (s) represents the time taken for each state.
+   * */
+  GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state 
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
+
+  private final String metricName;
+  private final String metricDescription;
+  private final String metricUnit;
+  private final OpenTelemetryMetricType metricType;
+
+  @SuppressWarnings("unchecked")
+  public <T extends OpenTelemetryMetric> T createMetric(Attributes attributes, 
Meter meter) {
+    return (T) this.metricType.getFactory().newMetric(this.metricName, 
this.metricDescription, this.metricUnit, attributes, meter);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Metric{name='%s', description='%s', unit='%s', 
type=%s}", metricName, metricDescription, metricUnit, metricType);
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
new file mode 100644
index 0000000000..d998380a54
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+public class GobblinOpenTelemetryMetricsConstants {
+
+  public static class DimensionKeys {
+    public static final String STATE = "state";
+    public static final String CURR_STATE = "currState";
+  }
+
+  public static class DimensionValues {
+    public static final String GENERATE_WU = "generateWU";
+    public static final String PROCESS_WU = "processWU";
+    public static final String COMMIT_STEP = "commitStep";
+    public static final String JOB_START = "jobStart";
+    public static final String JOB_COMPLETE = "jobComplete";
+    public static final String GENERATE_WU_START = "generateWUStart";
+    public static final String GENERATE_WU_COMPLETE = "generateWUComplete";
+    public static final String PROCESS_WU_START = "processWUStart";
+    public static final String PROCESS_WU_COMPLETE = "processWUComplete";
+    public static final String COMMIT_STEP_START = "commitStepStart";
+    public static final String COMMIT_STEP_COMPLETE = "commitStepComplete";
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
new file mode 100644
index 0000000000..e5fb3b13ba
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry 
{@link DoubleHistogram}.
+ *
+ * <p>This class provides a histogram for recording the distribution of double 
values.
+ * It supports recording values with optional additional attributes that can 
be merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric {
+  private String name;
+  private Attributes baseAttributes;
+  private DoubleHistogram doubleHistogram;
+
+  /**
+   * Records the specified value in the histogram with the base attributes.
+   *
+   * @param value the double value to record in the histogram
+   */
+  public void record(double value) {
+    log.debug("Emitting double histogram metric: {}, value: {}, attributes: 
{}", this.name, value, this.baseAttributes);
+    this.doubleHistogram.record(value, this.baseAttributes);
+  }
+
+  /**
+   * Records the specified value in the histogram with a combination of base 
attributes and additional attributes.
+   *
+   * @param value the double value to record in the histogram
+   * @param additionalAttributes the additional attributes to be merged with 
base attributes
+   */
+  public void record(double value, Attributes additionalAttributes) {
+    log.debug("Emitting double histogram metric: {}, value: {}, base 
attributes: {}, additional attributes: {}",
+        this.name, value, this.baseAttributes, additionalAttributes);
+    this.doubleHistogram.record(value, 
OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getMetricName() {
+    return this.name;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public OpenTelemetryMetricType getMetricType() {
+    return OpenTelemetryMetricType.DOUBLE_HISTOGRAM;
+  }
+
+  /**
+   * Returns a string representation of this histogram with its name.
+   */
+  @Override
+  public String toString() {
+    return "OpenTelemetryDoubleHistogram{name='" + name + "'}";
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
new file mode 100644
index 0000000000..b3e3d30b85
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.Map;
+
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+/**
+ * Utility class for OpenTelemetry related operations.
+ *
+ * <p>Provides methods to handle OpenTelemetry attributes, including merging 
multiple
+ * {@link Attributes} instances and converting maps to {@link Attributes}.
+ */
+@UtilityClass
+public class OpenTelemetryHelper {
+
+  private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = 
"UNKNOWN";
+
+  /**
+   * Returns the provided attribute value when it is non-null and non-empty;
+   * otherwise returns the default OpenTelemetry attribute placeholder.
+   *
+   * @param value candidate attribute value to check
+   * @return the original value if not empty, or 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
+   */
+  public static String getOrDefaultOpenTelemetryAttrValue(String value) {
+    return StringUtils.defaultIfBlank(value, 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+  }
+
+  /**
+   * Merges multiple {@link Attributes} instances into a single {@link 
Attributes}.
+   *
+   * <p>Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are 
ignored.
+   * The resulting {@link Attributes} contains all key-value pairs from the
+   * provided non-null, non-empty inputs in the order they are given.
+   * For duplicate keys, the last occurrence in the array will take precedence.
+   *
+   * @param attributesArray array of {@link Attributes} to merge; may contain 
{@code null} or empty entries
+   * @return a new {@link Attributes} instance containing all entries from the 
non-null,
+   *         non-empty inputs; never {@code null}
+   */
+  public static Attributes mergeAttributes(Attributes... attributesArray) {
+    AttributesBuilder builder = Attributes.builder();
+    for (Attributes attrs : attributesArray) {
+      if (attrs != null && !attrs.isEmpty()) {
+        builder.putAll(attrs);
+      }
+    }
+    return builder.build();
+  }
+
+  /**
+   * Converts a map of string attributes to an OpenTelemetry {@link 
Attributes} instance.
+   *
+   * <p>Each entry in the map is converted to an OpenTelemetry attribute, using
+   * {@link #getOrDefaultOpenTelemetryAttrValue(String)} to handle empty 
values.
+   *
+   * @param attributes map of string attributes to convert; may be {@code null}
+   * @return a new {@link Attributes} instance containing the converted 
attributes;
+   *         never {@code null}
+   */
+  public static Attributes toOpenTelemetryAttributes(Map<String, String> 
attributes) {
+    AttributesBuilder builder = Attributes.builder();
+    if (attributes != null) {
+      for (Map.Entry<String, String> entry : attributes.entrySet()) {
+        String key = entry.getKey();
+        String value = getOrDefaultOpenTelemetryAttrValue(entry.getValue());
+        builder.put(key, value);
+      }
+    }
+    return builder.build();
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
new file mode 100644
index 0000000000..26eae56819
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import com.google.common.base.Splitter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Provides OpenTelemetry instrumentation for metrics.
+ *
+ * <p>Maintains a singleton instance that holds common attributes {@link 
Attributes} and a Meter {@link Meter}.
+ * Exposes methods to retrieve or create metric instruments defined in {@link 
GobblinOpenTelemetryMetrics}.
+ */
+@Slf4j
+@Getter
+public class OpenTelemetryInstrumentation {
+
+  // Adding the gobblin-service.main 
(BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR) dependency is creating 
circular dependency issues
+  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+  private static final Splitter COMMA_SPLITTER = 
Splitter.on(',').omitEmptyStrings().trimResults();
+  private static volatile OpenTelemetryInstrumentation GLOBAL_INSTANCE;
+
+  private final Attributes commonAttributes;
+  private final Meter meter;
+  private final ConcurrentHashMap<String, OpenTelemetryMetric> metrics;
+
+  private OpenTelemetryInstrumentation(final State state) {
+    this.commonAttributes = buildCommonAttributes(state);
+    this.meter = getOpenTelemetryMetrics(state).getMeter(state.getProp(
+        ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME));
+    this.metrics = new ConcurrentHashMap<>();
+  }
+
+  private OpenTelemetryMetricsBase getOpenTelemetryMetrics(State state) {
+    try {
+      String openTelemetryClassName = 
state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME);
+      Class<?> metricsClass = Class.forName(openTelemetryClassName);
+      Method getInstanceMethod = metricsClass.getMethod("getInstance", 
State.class);
+      return (OpenTelemetryMetricsBase) getInstanceMethod.invoke(null, state);
+    } catch (Exception e) {
+      log.error("Failed to initialize OpenTelemetryMetrics through reflection, 
defaulting to direct instantiation of InMemoryOpenTelemetryMetrics", e);
+    }
+    return InMemoryOpenTelemetryMetrics.getInstance(state);
+  }
+
+  /**
+   * Returns the singleton instance for the given configuration state.
+   *
+   * @param state the configuration containing metric reporting and dimension 
configs
+   * @return the global {@link OpenTelemetryInstrumentation} instance
+   */
+  public static OpenTelemetryInstrumentation getInstance(final State state) {
+    if (GLOBAL_INSTANCE == null) {
+      synchronized (OpenTelemetryInstrumentation.class) {
+        if (GLOBAL_INSTANCE == null) {
+          log.info("Creating OpenTelemetryInstrumentation instance");
+          GLOBAL_INSTANCE = new OpenTelemetryInstrumentation(state);
+        }
+      }
+    }
+    return GLOBAL_INSTANCE;
+  }
+
+  public static OpenTelemetryInstrumentation getInstance(final Properties 
props) {
+    return getInstance(new State(props));
+  }
+
+  /**
+   * Retrieves an existing metric by its enum definition or creates it if 
absent.
+   *
+   * @param metric the {@link GobblinOpenTelemetryMetrics} enum defining name, 
description, unit, and type {@link OpenTelemetryMetricType}
+   * @return an {@link OpenTelemetryMetric} instance corresponding to the 
provided enum
+   */
+  @SuppressWarnings("unchecked")
+  public <T extends OpenTelemetryMetric> T 
getOrCreate(GobblinOpenTelemetryMetrics metric) {
+    return (T) this.metrics.computeIfAbsent(metric.getMetricName(), name -> 
metric.createMetric(this.commonAttributes, this.meter));
+  }
+
+  private Attributes buildCommonAttributes(final State state) {
+    AttributesBuilder attributesBuilder = Attributes.builder();
+    String commonDimensions = 
state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+    if (StringUtils.isNotEmpty(commonDimensions)) {
+      for (String dimension : COMMA_SPLITTER.split(commonDimensions)) {
+        String dimensionKey = dimension.trim();
+        String dimensionValue = state.getProp(dimensionKey, "");
+        if (ConfigurationKeys.FLOW_EDGE_ID_KEY.equals(dimensionKey)) {
+          dimensionValue = getFlowEdgeId(state, dimensionValue);
+        }
+        if (StringUtils.isNotEmpty(dimensionValue)) {
+          attributesBuilder.put(dimensionKey, 
OpenTelemetryHelper.getOrDefaultOpenTelemetryAttrValue(dimensionValue));
+        }
+      }
+    }
+    return attributesBuilder.build();
+  }
+
+  private static String getFlowEdgeId(final State state, String 
fullFlowEdgeId) {
+    // Parse the flowEdgeId from fullFlowEdgeId that is stored in format 
sourceNode_destinationNode_flowEdgeId
+    return StringUtils.substringAfter(
+        StringUtils.substringAfter(fullFlowEdgeId, 
state.getProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "")),
+        FLOW_EDGE_LABEL_JOINER_CHAR);
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
new file mode 100644
index 0000000000..4d06a6af90
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry 
{@link LongCounter}.
+ *
+ * <p>This class provides a counter for recording values.
+ * It supports adding values with optional additional attributes that can be 
merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryLongCounter implements OpenTelemetryMetric {
+  private String name;
+  private Attributes baseAttributes;
+  private LongCounter longCounter;
+
+  /**
+   * Adds the specified value to the counter with the base attributes.
+   *
+   * @param value the value to add to the counter
+   */
+  public void add(long value) {
+    log.debug("Emitting long counter metric: {}, value: {}, attributes: {}", 
this.name, value, this.baseAttributes);
+    this.longCounter.add(value, this.baseAttributes);
+  }
+
+  /**
+   * Adds the specified value to the counter with a combination of base 
attributes and additional attributes.
+   *
+   * @param value the value to add to the counter
+   * @param additionalAttributes the additional attributes to be merged with 
base attributes
+   */
+  public void add(long value, Attributes additionalAttributes) {
+    log.debug("Emitting long counter metric: {}, value: {}, base attributes: 
{}, additional attributes: {}",
+        this.name, value, this.baseAttributes, additionalAttributes);
+    this.longCounter.add(value, 
OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getMetricName() {
+    return this.name;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public OpenTelemetryMetricType getMetricType() {
+    return OpenTelemetryMetricType.LONG_COUNTER;
+  }
+
+  /**
+   * Returns a string representation of this counter with its name.
+   */
+  @Override
+  public String toString() {
+    return "OpenTelemetryLongCounter{name='" + name + "'}";
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
new file mode 100644
index 0000000000..48f27f17a8
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+/**
+ * Interface representing a metric in the OpenTelemetry system.
+ * It provides methods to retrieve the metric name and type.
+ */
+public interface OpenTelemetryMetric {
+
+  /**
+   * Returns the  name of the metric.
+   *
+   * @return the metric name as a {@link String}
+   */
+  String getMetricName();
+
+  /**
+   * Returns the {@link OpenTelemetryMetricType} indicating the kind of metric 
instrument.
+   *
+   * @return the metric type {@link OpenTelemetryMetricType}
+   */
+  OpenTelemetryMetricType getMetricType();
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
new file mode 100644
index 0000000000..30b2da207c
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+
+/**
+ * Factory interface for creating OpenTelemetry metrics of type {@link T}.
+ *
+ * @param <T> the type of OpenTelemetry metric to be created
+ */
+public interface OpenTelemetryMetricFactory<T extends OpenTelemetryMetric> {
+
+  OpenTelemetryMetricFactory<OpenTelemetryLongCounter> LONG_COUNTER_FACTORY = 
new OpenTelemetryLongCounterFactory();
+  OpenTelemetryMetricFactory<OpenTelemetryDoubleHistogram> 
DOUBLE_HISTOGRAM_FACTORY = new OpenTelemetryDoubleHistogramFactory();
+
+  T newMetric(String name, String description, String unit, Attributes 
attributes, Meter meter);
+
+  /** Factory class for creating {@link OpenTelemetryLongCounter} metrics. */
+  class OpenTelemetryLongCounterFactory implements 
OpenTelemetryMetricFactory<OpenTelemetryLongCounter> {
+
+    @Override
+    public OpenTelemetryLongCounter newMetric(String name, String description, 
String unit, Attributes attributes, Meter meter) {
+      return new OpenTelemetryLongCounter(name, attributes,
+          
meter.counterBuilder(name).setDescription(description).setUnit(unit).build());
+    }
+
+  }
+
+  /** Factory class for creating {@link OpenTelemetryDoubleHistogram} metrics. 
*/
+  class OpenTelemetryDoubleHistogramFactory implements 
OpenTelemetryMetricFactory<OpenTelemetryDoubleHistogram> {
+
+    @Override
+    public OpenTelemetryDoubleHistogram newMetric(String name, String 
description, String unit, Attributes attributes, Meter meter) {
+      return new OpenTelemetryDoubleHistogram(name, attributes,
+          
meter.histogramBuilder(name).setDescription(description).setUnit(unit).build());
+    }
+
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
new file mode 100644
index 0000000000..8890d37d90
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.Getter;
+
+/**
+ * Enum representing the types of OpenTelemetry metrics supported.
+ */
+@Getter
+public enum OpenTelemetryMetricType {
+  /** Represents a metric of type LongCounter. */
+  LONG_COUNTER(OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY),
+  /** Represents a metric of type DoubleHistogram. */
+  DOUBLE_HISTOGRAM(OpenTelemetryMetricFactory.DOUBLE_HISTOGRAM_FACTORY);
+
+  private final OpenTelemetryMetricFactory<? extends OpenTelemetryMetric> 
factory;
+
+  OpenTelemetryMetricType(OpenTelemetryMetricFactory<? extends 
OpenTelemetryMetric> factory) {
+    this.factory = factory;
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
new file mode 100644
index 0000000000..b1fef64019
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+
+/**
+ * Tests for {@link OpenTelemetryHelper} class.
+ */
+public class OpenTelemetryHelperTest {
+
+  @Test
+  public void testMergeAttributes() {
+    Attributes emptyAttributes = Attributes.empty();
+    Attributes attributes1 = Attributes.builder().put("key1", 
"value1").build();
+    Attributes attributes2 = Attributes.builder().put("key2", 
"value2").build();
+
+    Attributes attributes = OpenTelemetryHelper.mergeAttributes(null, 
emptyAttributes);
+    Assert.assertEquals(attributes.size(), 0);
+    Assert.assertEquals(attributes, emptyAttributes);
+
+    attributes = OpenTelemetryHelper.mergeAttributes(attributes1, attributes2);
+    Assert.assertEquals(attributes.size(), 2);
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), 
"value1");
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")), 
"value2");
+
+    attributes = OpenTelemetryHelper.mergeAttributes(attributes1, 
emptyAttributes);
+    Assert.assertEquals(attributes.size(), 1);
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), 
"value1");
+    Assert.assertNull(attributes.get(AttributeKey.stringKey("key2")));
+
+  }
+
+  @Test
+  public void testToOpenTelemetryAttributes() {
+    Attributes attributes = 
OpenTelemetryHelper.toOpenTelemetryAttributes(null);
+    Assert.assertEquals(attributes.size(), 0);
+
+    Map<String, String> map = new HashMap<>();
+    map.put("key1", "value1");
+    map.put("key2", "value2");
+
+    attributes = OpenTelemetryHelper.toOpenTelemetryAttributes(map);
+    Assert.assertEquals(attributes.size(), 2);
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), 
"value1");
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")), 
"value2");
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
new file mode 100644
index 0000000000..74c7f57fbf
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Field;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Unit tests for {@link OpenTelemetryInstrumentation}.
+ * These tests ensure that the singleton instance is created correctly,
+ * common attributes are built from dimensions, and metrics are created and 
cached properly.
+ */
+public class OpenTelemetryInstrumentationTest {
+
+  private OpenTelemetryInstrumentation instrumentation;
+  private State state;
+
+  @BeforeMethod
+  public void setUp() throws NoSuchFieldException, IllegalAccessException {
+    // Reset singleton instance before each test
+    Field instanceField = 
OpenTelemetryInstrumentation.class.getDeclaredField("GLOBAL_INSTANCE");
+    instanceField.setAccessible(true);
+    instanceField.set(null, null);
+
+    state = new State();
+    state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME, 
"org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics");
+  }
+
+  @Test
+  public void singletonInstanceCreatedOnce() {
+    OpenTelemetryInstrumentation instance1 = 
OpenTelemetryInstrumentation.getInstance(state);
+    OpenTelemetryInstrumentation instance2 = 
OpenTelemetryInstrumentation.getInstance(state);
+
+    Assert.assertSame(instance1, instance2, "getInstance should return the 
same instance");
+  }
+
+  @Test
+  public void emptyDimensionsCreateEmptyAttributes() {
+    
state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+
+    instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+    Attributes attributes = instrumentation.getCommonAttributes();
+
+    Assert.assertEquals(attributes.size(), 0);
+  }
+
+  @Test
+  public void commonAttributesBuiltFromDimensions() {
+    
state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, 
"dim1,dim2");
+    state.setProp("dim1", "value1");
+    state.setProp("dim2", "value2");
+
+    instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+    Attributes attributes = instrumentation.getCommonAttributes();
+
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim1")), 
"value1");
+    Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim2")), 
"value2");
+  }
+
+  @Test
+  public void flowEdgeIdParsedCorrectly() {
+    state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS,
+        ConfigurationKeys.FLOW_EDGE_ID_KEY);
+    state.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
"destNode");
+    state.setProp(ConfigurationKeys.FLOW_EDGE_ID_KEY, 
"sourceNode_destNode_edgeId");
+
+    instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+    Attributes attributes = instrumentation.getCommonAttributes();
+
+    
Assert.assertEquals(attributes.get(AttributeKey.stringKey(ConfigurationKeys.FLOW_EDGE_ID_KEY)),
+        "edgeId");
+  }
+
+  @Test
+  public void metricsAreCreatedAndCached() {
+    instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+    Assert.assertEquals(instrumentation.getMetrics().size(), 0, "Metrics map 
should be empty initially");
+    instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+    Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map 
should contain one metric after creation");
+    instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+    Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map 
should still contain one metric after duplicate creation");
+    
instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY);
+    Assert.assertEquals(instrumentation.getMetrics().size(), 2, "Metrics map 
should contain two metrics after creating another");
+  }
+
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
new file mode 100644
index 0000000000..580878a357
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+
+/** Tests for OpenTelemetry metrics implementation, & its associated factory 
classes {@link OpenTelemetryMetricFactory}
+ * specifically for {@link OpenTelemetryLongCounter} and {@link 
OpenTelemetryDoubleHistogram}.
+ * These tests validate the object creation through factory classes {@link 
OpenTelemetryMetricFactory#LONG_COUNTER_FACTORY}
+ * & {@link OpenTelemetryMetricFactory#DOUBLE_HISTOGRAM_FACTORY} ,correct 
recording of metrics with and without
+ * additional attributes.
+ */
+public class OpenTelemetryMetricTest {
+
+  private InMemoryOpenTelemetryMetrics inMemoryOpenTelemetryMetrics;
+  private final String testMeterGroupName = "testMeterGroup";
+  private final Attributes baseAttributes = Attributes.builder()
+      .put("dim1", "val1")
+      .put("dim2", "val2")
+      .build();
+
+  @BeforeMethod
+  public void setUp() {
+    inMemoryOpenTelemetryMetrics = 
InMemoryOpenTelemetryMetrics.getInstance(new State());
+  }
+
+  @Test
+  public void testOpenTelemetryLongCounter() {
+    String metricName = "testLongCounter";
+    OpenTelemetryLongCounter longCounter = 
OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY.newMetric(metricName,
+        "testLongCounterDescription", "1", baseAttributes, 
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+    longCounter.add(20, Attributes.builder().put("dim3", "val3").build());
+    Collection<MetricData> metrics = 
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+    Assert.assertEquals(metrics.size(), 1);
+    Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> 
metricData));
+    MetricData metricData = metricsByName.get(metricName);
+    List<LongPointData> dataPoints = new 
ArrayList<>(metricData.getLongSumData().getPoints());
+    Assert.assertEquals(dataPoints.size(), 1);
+    Assert.assertEquals(dataPoints.get(0).getValue(), 20);
+    
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")),
 "val1");
+    
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")),
 "val2");
+    
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")),
 "val3");
+  }
+
+  @Test
+  public void testOpenTelemetryLongCounterWithoutAdditionalAttributes() {
+    String metricName = "testLongCounterWithoutAdditionalAttributes";
+    OpenTelemetryLongCounter longCounter = 
OpenTelemetryMetricFactory.LONG_COUNTER_FACTORY.newMetric(metricName,
+        "testLongCounterDescription", "1", baseAttributes, 
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+    longCounter.add(10);
+    Collection<MetricData> metrics = 
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+    Assert.assertEquals(metrics.size(), 1);
+    Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> 
metricData));
+    MetricData metricData = metricsByName.get(metricName);
+    List<LongPointData> dataPoints = new 
ArrayList<>(metricData.getLongSumData().getPoints());
+    Assert.assertEquals(dataPoints.size(), 1);
+    Assert.assertEquals(dataPoints.get(0).getValue(), 10);
+    
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")),
 "val1");
+    
Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")),
 "val2");
+    
Assert.assertNull(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")),
+        "Additional attribute dim3 should not be present when not provided");
+  }
+
+  @Test
+  public void testOpenTelemetryDoubleHistogram() {
+    String metricName = "testDoubleHistogram";
+    OpenTelemetryDoubleHistogram doubleHistogram = 
OpenTelemetryMetricFactory.DOUBLE_HISTOGRAM_FACTORY.newMetric(metricName,
+        "testDoubleHistogramDescription", "s", baseAttributes, 
inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName));
+    doubleHistogram.record(5.0, Attributes.builder().put("dim3", 
"val3").build());
+    doubleHistogram.record(10.0, Attributes.builder().put("dim3", 
"val3").build());
+    Collection<MetricData> metrics = 
inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+    Assert.assertEquals(metrics.size(), 1);
+    Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> 
metricData));
+    MetricData metricData = metricsByName.get(metricName);
+    List<HistogramPointData> dataPoints = new 
ArrayList<>(metricData.getHistogramData().getPoints());
+    Assert.assertEquals(dataPoints.size(), 1);
+    Assert.assertEquals(dataPoints.get(0).getSum(), 15.0, "Sum should match 
recorded value");
+  }
+
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 94819c9c85..0b95b78ef4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -124,6 +124,8 @@ public interface GobblinTemporalConfigurationKeys {
       PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
   String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
       PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "emit.otel.metrics." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options.";
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
   int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
index 22126b2d0f..0860fa82b0 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -52,6 +52,9 @@ public enum ActivityType {
   /** Activity type for submitting GTE. */
   
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
 
+  /** Activity type for emitting open telemetry metrics */
+  
EMIT_OTEL_METRICS(GobblinTemporalConfigurationKeys.TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
   /** Default placeholder activity type. */
   
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
new file mode 100644
index 0000000000..7b32a7b873
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.Map;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+
+/**
+ * Temporal activity interface for emitting OpenTelemetry metrics.
+ */
+@ActivityInterface
+public interface EmitOTelMetrics {
+
+  /**
+   * Emits a long counter metric.
+   *
+   * @param metric The OpenTelemetry metric to emit.
+   * @param value The value to add to the counter.
+   * @param attributes Additional attributes for the metric.
+   * @param jobProps Properties related to the job context.
+   */
+  @ActivityMethod
+  void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value, 
Map<String, String> attributes, Properties jobProps);
+
+  /**
+   * Emits a double histogram metric.
+   *
+   * @param metric The OpenTelemetry metric to emit.
+   * @param value The value to record in the histogram.
+   * @param attributes Additional attributes for the metric.
+   * @param jobProps Properties related to the job context.
+   */
+  @ActivityMethod
+  void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double 
value, Map<String, String> attributes, Properties jobProps);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
new file mode 100644
index 0000000000..644aafde6f
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryLongCounter;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryMetricType;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
+
+/**
+ * Implementation of {@link EmitOTelMetrics} that emits OpenTelemetry metrics.
+ */
+public class EmitOTelMetricsImpl implements EmitOTelMetrics {
+
+  @Override
+  public void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long 
value, Map<String, String> attributes, Properties jobProps) {
+    if (!metric.getMetricType().equals(OpenTelemetryMetricType.LONG_COUNTER)) {
+      throw new IllegalArgumentException("Metric " + metric.getMetricName() + 
" is not of type LONG_COUNTER");
+    }
+    OpenTelemetryLongCounter longCounter = 
OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+    longCounter.add(value, 
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+  }
+
+  @Override
+  public void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, 
double value, Map<String, String> attributes, Properties jobProps) {
+    if 
(!metric.getMetricType().equals(OpenTelemetryMetricType.DOUBLE_HISTOGRAM)) {
+      throw new IllegalArgumentException("Metric " + metric.getMetricName() + 
" is not of type DOUBLE_HISTOGRAM");
+    }
+    OpenTelemetryDoubleHistogram doubleHistogram = 
OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+    doubleHistogram.record(value, 
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index b2f65ccb41..ad61e7b99d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -17,7 +17,9 @@
 
 package org.apache.gobblin.temporal.ddm.launcher;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -33,12 +35,14 @@ import 
org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
 import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -50,6 +54,9 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
 
 /**
  * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that executes a full Gobblin job workflow of:
@@ -96,8 +103,21 @@ public class ExecuteGobblinJobLauncher extends 
GobblinTemporalJobLauncher {
       EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder(eventSubmitter)
           .withGaaSJobProps(finalProps)
           .build();
+
+      Map<String, String> attributes = new HashMap<>();
+      attributes.put(CURR_STATE, JOB_START);
+      EmitOTelMetricsImpl emitOTelMetrics = new EmitOTelMetricsImpl();
+      
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, finalProps);
+
+      long startTimeMillis = System.currentTimeMillis();
       ExecGobblinStats execGobblinStats = workflow.execute(finalProps, 
eventSubmitterContext);
+      double timeTaken = (System.currentTimeMillis() - startTimeMillis) / 
1000.0;
       log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", 
execGobblinStats);
+      attributes.put(CURR_STATE, JOB_COMPLETE);
+      
emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, finalProps);
+      attributes.remove(CURR_STATE);
+      attributes.put(STATE, JOB_COMPLETE);
+      
emitOTelMetrics.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
 timeTaken, attributes, finalProps);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 12fe6c4d84..3ee45d20eb 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -28,6 +28,7 @@ import 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
 import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
 import 
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
 import 
org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
@@ -60,7 +61,7 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
     @Override
     protected Object[] getActivityImplInstances() {
         return new Object[] { new SubmitGTEActivityImpl(), new 
GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), 
new ProcessWorkUnitImpl(),
-            new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
+            new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(), new 
EmitOTelMetricsImpl()};
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 50d587c4ef..f14788acf6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -23,7 +23,6 @@ import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.Workflow;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 05fa2a8546..3e08c7536b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -46,9 +48,11 @@ import io.temporal.workflow.Workflow;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
@@ -75,6 +79,9 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
 
 @Slf4j
 public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
@@ -83,11 +90,16 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
-    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
-        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+    final Properties temporalJobProps = PropertiesUtils.combineProperties(
+        PropertiesUtils.extractPropertiesWithPrefix(jobProps, 
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)),
+        PropertiesUtils.extractPropertiesWithPrefix(jobProps, 
com.google.common.base.Optional.of(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_PREFIX))
+    );
     // Add File system properties to the temporal job properties
     
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
         
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
+    Map<String, String> attributes = new HashMap<>();
+    final EmitOTelMetrics emitOTelMetricsActivityStub = 
Workflow.newActivityStub(EmitOTelMetrics.class,
+        ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(temporalJobProps, 
false));
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, 
temporalJobProps);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -97,7 +109,16 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     try (Closer closer = Closer.create()) {
       final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
           
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps, true));
+      attributes.put(CURR_STATE, GENERATE_WU_START);
+      
emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1, attributes, temporalJobProps);
+      long genWUStartTime = Workflow.currentTimeMillis();
       GenerateWorkUnitsResult generateWorkUnitResult = 
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+      attributes.put(CURR_STATE, GENERATE_WU_COMPLETE);
+      
emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1, attributes, temporalJobProps);
+      double genWUTImeTaken = (Workflow.currentTimeMillis() - genWUStartTime) 
/ 1000.0;
+      attributes.remove(CURR_STATE);
+      attributes.put(STATE, GENERATE_WU);
+      
emitOTelMetricsActivityStub.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY,
 genWUTImeTaken, attributes, temporalJobProps);
       optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
       WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResult.getWorkUnitsSizeSummary();
       int numWUsGenerated = 
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 30db5a79b2..94e870c283 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -29,9 +30,12 @@ import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
@@ -50,15 +54,22 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 import org.apache.gobblin.util.PropertiesUtils;
 
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
 
 @Slf4j
 public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
   public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
   public static final String COMMIT_STEP_WORKFLOW_ID_BASE = 
"CommitStepWorkflow";
 
+  private EmitOTelMetrics emitOTelMetricsActivityStub;
+
   @Override
   public CommitStats process(WUProcessingSpec workSpec, final Properties 
props) {
     Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec, props);
+    this.emitOTelMetricsActivityStub = 
Workflow.newActivityStub(EmitOTelMetrics.class,
+        ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(props, false));
     CommitStats result = performWork(workSpec, props);
     timer.ifPresent(EventTimer::stop);
     return result;
@@ -75,7 +86,18 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
           performWorkloadInput = new 
NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0,
           workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
           Optional.empty(), props);
+      Map<String, String> attributes = new HashMap<>();
+      attributes.put(CURR_STATE, PROCESS_WU_START);
+      
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, props);
+      long processWUStartTime = Workflow.currentTimeMillis();
       workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(performWorkloadInput));
+      attributes.put(CURR_STATE, PROCESS_WU_COMPLETE);
+      
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, props);
+      attributes.remove(CURR_STATE);
+      attributes.put(STATE, PROCESS_WU);
+      double processWUDuration = (Workflow.currentTimeMillis() - 
processWUStartTime) / 1000.0;
+      this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+          GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, 
processWUDuration, attributes, props);
     } catch (Exception e) {
       log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
 
@@ -115,7 +137,18 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
       return CommitStats.createEmpty();
     }
     CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put(CURR_STATE, COMMIT_STEP_START);
+    
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, props);
+    long commitStepStartTime = Workflow.currentTimeMillis();
     CommitStats result = commitWorkflow.commit(workSpec, props);
+    attributes.put(CURR_STATE, COMMIT_STEP_COMPLETE);
+    
this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE,
 1L, attributes, props);
+    attributes.remove(CURR_STATE);
+    attributes.put(STATE, COMMIT_STEP);
+    double commitStepDuration = (Workflow.currentTimeMillis() - 
commitStepStartTime) / 1000.0;
+    this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+        GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, 
commitStepDuration, attributes, props);
     if (result.getNumCommittedWorkUnits() == 0) {
       log.warn("No work units committed at the job level. They could have been 
committed at the task level.");
     }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
index 8ce5c2991d..6e2a54fb3d 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -70,6 +70,7 @@ public class ActivityTypeTest {
         {ActivityType.PROCESS_WORKUNIT, 555},
         {ActivityType.COMMIT, 444},
         {ActivityType.SUBMIT_GTE, 999},
+        {ActivityType.EMIT_OTEL_METRICS, 888},
         {ActivityType.DEFAULT_ACTIVITY, 1}
     };
   }
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index 2263dcdfa0..f0207d9ba4 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,7 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("publishToMaven", false, "Enable publishing of 
artifacts to a central Maven repository"))
     .register(new BuildProperty("publishToNexus", false, "Enable publishing of 
artifacts to Nexus"))
     .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce 
dependencies version"))
-    .register(new BuildProperty("openTelemetryVersion", "1.30.0", 
"OpenTelemetry dependencies version"))
+    .register(new BuildProperty("openTelemetryVersion", "1.47.0", 
"OpenTelemetry dependencies version"))
     .register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer 
dependencies version"))
 task buildProperties(description: 'Lists main properties that can be used to 
customize the build') {
   doLast {

Reply via email to