[ 
https://issues.apache.org/jira/browse/GOBBLIN-2209?focusedWorklogId=975386&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975386
 ]

ASF GitHub Bot logged work on GOBBLIN-2209:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jul/25 06:16
            Start Date: 21/Jul/25 06:16
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4118:
URL: https://github.com/apache/gobblin/pull/4118#discussion_r2218254413


##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry 
{@link DoubleHistogram}.
+ *
+ * <p>This class provides a histogram for recording the distribution of double 
values.
+ * It supports recording values with optional additional attributes that can 
be merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric {
+  private String name;
+  private Attributes baseAttributes;
+  private DoubleHistogram doubleHistogram;
+
+  /**
+   * Records the specified value in the histogram with the base attributes.
+   *
+   * @param value the double value to record in the histogram
+   */
+  public void record(double value) {
+    log.info("Emitting double histogram metric: {}, value: {}, attributes: 
{}", this.name, value, this.baseAttributes);

Review Comment:
   we can change this to `log.debug` to avoid log noise during high metric 
emissions..  similarly for below



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetricsConstants.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+public class GaaSOpenTelemetryMetricsConstants {
+
+  public static class DimensionKeys {
+    public static final String STATE = "state";
+    public static final String CURR_STATE = "currState";

Review Comment:
   what's the difference between `state` and `currState`?



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.Map;
+
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+/**
+ * Utility class for OpenTelemetry related operations.
+ *
+ * <p>Provides methods to handle OpenTelemetry attributes, including merging 
multiple
+ * {@link Attributes} instances and converting maps to {@link Attributes}.
+ */
+@UtilityClass
+public class OpenTelemetryHelper {
+
+  private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-";

Review Comment:
   should we use something like `UNKNOWN / NA`?



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import com.google.common.base.Splitter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Provides OpenTelemetry instrumentation for metrics.
+ *
+ * <p>Maintains a singleton instance that holds common attributes {@link 
Attributes} and a Meter {@link Meter}.
+ * Exposes methods to retrieve or create metric instruments defined in {@link 
GaaSOpenTelemetryMetrics}.
+ */
+@Slf4j
+@Getter
+public class OpenTelemetryInstrumentation {
+
+  // Adding the gobblin-service.main 
(BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR) dependency is creating 
circular dependency issues
+  private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+  private static final Splitter COMMA_SPLITTER = 
Splitter.on(',').omitEmptyStrings().trimResults();
+  private static volatile OpenTelemetryInstrumentation GLOBAL_INSTANCE;
+
+  private final Attributes commonAttributes;
+  private final Meter meter;
+  private final ConcurrentHashMap<String, OpenTelemetryMetric> metrics;
+
+  private OpenTelemetryInstrumentation(final State state) {
+    this.commonAttributes = buildCommonAttributes(state);
+    this.meter = getOpenTelemetryMetrics(state).getMeter(state.getProp(
+        ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME));
+    this.metrics = new ConcurrentHashMap<>();
+  }
+
+  private OpenTelemetryMetricsBase getOpenTelemetryMetrics(State state) {
+    try {
+      String openTelemetryClassName = 
state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME);
+      Class<?> metricsClass = Class.forName(openTelemetryClassName);
+      Method getInstanceMethod = metricsClass.getMethod("getInstance", 
State.class);
+      return (OpenTelemetryMetricsBase) getInstanceMethod.invoke(null, state);
+    } catch (Exception e) {
+      log.error("Failed to initialize OpenTelemetryMetrics through reflection, 
defaulting to direct instantiation of InMemoryOpenTelemetryMetrics", e);
+    }
+    return InMemoryOpenTelemetryMetrics.getInstance(state);
+  }
+
+  /**
+   * Returns the singleton instance for the given configuration state.
+   *
+   * @param state the configuration containing metric reporting and dimension 
configs
+   * @return the global {@link OpenTelemetryInstrumentation} instance
+   */
+  public static OpenTelemetryInstrumentation getInstance(final State state) {
+    if (GLOBAL_INSTANCE == null) {
+      synchronized (OpenTelemetryInstrumentation.class) {
+        if (GLOBAL_INSTANCE == null) {
+          log.info("Creating OpenTelemetryInstrumentation instance");
+          GLOBAL_INSTANCE = new OpenTelemetryInstrumentation(state);
+        }
+      }
+    }
+    return GLOBAL_INSTANCE;
+  }
+
+  public static OpenTelemetryInstrumentation getInstance(final Properties 
props) {
+    return getInstance(new State(props));
+  }
+
+  /**
+   * Retrieves an existing metric by its enum definition or creates it if 
absent.
+   *
+   * @param metric the {@link GaaSOpenTelemetryMetrics} enum defining name, 
description, unit, and type {@link OpenTelemetryMetricType}
+   * @return an {@link OpenTelemetryMetric} instance corresponding to the 
provided enum
+   */
+  public OpenTelemetryMetric getOrCreate(GaaSOpenTelemetryMetrics metric) {
+    return this.metrics.computeIfAbsent(metric.getMetricName(), name -> 
createMetric(metric));
+  }

Review Comment:
   the metrics map uses only metricName as key. If two metrics share the same 
name but differ in type (eg, LONG_COUNTER vs DOUBLE_HISTOGRAM), this can 
silently cause incorrect caching behavior. It would be better to use a 
composite key like metricName + "_" + metricType to avoid collisions



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.Map;
+
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+/**
+ * Utility class for OpenTelemetry related operations.
+ *
+ * <p>Provides methods to handle OpenTelemetry attributes, including merging 
multiple
+ * {@link Attributes} instances and converting maps to {@link Attributes}.
+ */
+@UtilityClass
+public class OpenTelemetryHelper {
+
+  private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-";
+
+  /**
+   * Returns the provided attribute value when it is non-null and non-empty;
+   * otherwise returns the default OpenTelemetry attribute placeholder.
+   *
+   * @param value candidate attribute value to check
+   * @return the original value if not empty, or 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
+   */
+  public static String getOrDefaultOpenTelemetryAttrValue(String value) {
+    return StringUtils.isNotEmpty(value) ? value : 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE;
+  }
+
+  /**
+   * Merges multiple {@link Attributes} instances into a single {@link 
Attributes}.
+   *
+   * <p>Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are 
ignored.
+   * The resulting {@link Attributes} contains all key-value pairs from the
+   * provided non-null, non-empty inputs in the order they are given.
+   *
+   * @param attributesArray array of {@link Attributes} to merge; may contain 
{@code null} or empty entries
+   * @return a new {@link Attributes} instance containing all entries from the 
non-null,
+   *         non-empty inputs; never {@code null}
+   */
+  public static Attributes mergeAttributes(Attributes... attributesArray) {
+    AttributesBuilder builder = Attributes.builder();
+    for (Attributes attrs : attributesArray) {
+      if (attrs != null && !attrs.isEmpty()) {
+        builder.putAll(attrs);

Review Comment:
   how are we planning to handle overwrites here? It seems that we intend to 
have the later attribute array overwrite the earlier value, please add that 
this to the javadoc. 



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetrics.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+public enum GaaSOpenTelemetryMetrics {
+  GAAS_JOB_STATUS("gaas_job_status", "Gaas job status counter", "1", 
OpenTelemetryMetricType.LONG_COUNTER),
+  GAAS_JOB_STATE_LATENCY("gaas_job_state_latency", "Gaas job state latency", 
"s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);

Review Comment:
   add javadoc to each enum wrt what the metric tracks.. this would help future 
contributors understand the usage without needing to trace emission



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetrics.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+public enum GaaSOpenTelemetryMetrics {

Review Comment:
   We can move the metric creation logic into the GaaSOpenTelemetryMetrics enum 
itself instead of centralizing it in OpenTelemetryInstrumentation.createMetric.
   
   Currently, the enum acts as a metadata holder, while the actual 
instantiation logic(via switch-case) is external and hence scattered. This 
becomes harder to extend if we introduce new metric types(eg gauges, timers, 
etc). A more extensible approach would be to let each enum constant hold a 
factory method and expose a createMetric() method



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry 
{@link LongCounter}.
+ *
+ * <p>This class provides a counter for recording values.
+ * It supports adding values with optional additional attributes that can be 
merged with base attributes.</p>
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryLongCounter implements OpenTelemetryMetric {
+  private String name;
+  private Attributes baseAttributes;
+  private LongCounter longCounter;
+
+  /**
+   * Adds the specified value to the counter with the base attributes.
+   *
+   * @param value the value to add to the counter
+   */
+  public void add(long value) {
+    log.info("Emitting long counter metric: {}, value: {}, attributes: {}", 
this.name, value, this.baseAttributes);

Review Comment:
   `log.debug`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 975386)
    Time Spent: 3h  (was: 2h 50m)

> Emit GaaS Executor OTel Metrics
> -------------------------------
>
>                 Key: GOBBLIN-2209
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2209
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-metrics
>            Reporter: Vivek Rai
>            Assignee: Issac Buenrostro
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Emit GaaS Executor OTel Metrics



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to