khandelwal-prateek commented on code in PR #4118: URL: https://github.com/apache/gobblin/pull/4118#discussion_r2218254413
########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; + + +/** + * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry {@link DoubleHistogram}. + * + * <p>This class provides a histogram for recording the distribution of double values. + * It supports recording values with optional additional attributes that can be merged with base attributes.</p> + * + */ +@Slf4j +@AllArgsConstructor +public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric { + private String name; + private Attributes baseAttributes; + private DoubleHistogram doubleHistogram; + + /** + * Records the specified value in the histogram with the base attributes. + * + * @param value the double value to record in the histogram + */ + public void record(double value) { + log.info("Emitting double histogram metric: {}, value: {}, attributes: {}", this.name, value, this.baseAttributes); Review Comment: we can change this to `log.debug` to avoid log noise during high metric emissions.. similarly for below ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetricsConstants.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +public class GaaSOpenTelemetryMetricsConstants { + + public static class DimensionKeys { + public static final String STATE = "state"; + public static final String CURR_STATE = "currState"; Review Comment: what's the difference between `state` and `currState`? ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import java.util.Map; + +import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.StringUtils; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; + +/** + * Utility class for OpenTelemetry related operations. + * + * <p>Provides methods to handle OpenTelemetry attributes, including merging multiple + * {@link Attributes} instances and converting maps to {@link Attributes}. + */ +@UtilityClass +public class OpenTelemetryHelper { + + private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-"; Review Comment: should we use something like `UNKNOWN / NA`? ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import java.lang.reflect.Method; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import com.google.common.base.Splitter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.service.ServiceConfigKeys; + + +/** + * Provides OpenTelemetry instrumentation for metrics. + * + * <p>Maintains a singleton instance that holds common attributes {@link Attributes} and a Meter {@link Meter}. + * Exposes methods to retrieve or create metric instruments defined in {@link GaaSOpenTelemetryMetrics}. + */ +@Slf4j +@Getter +public class OpenTelemetryInstrumentation { + + // Adding the gobblin-service.main (BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR) dependency is creating circular dependency issues + private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_"; + private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults(); + private static volatile OpenTelemetryInstrumentation GLOBAL_INSTANCE; + + private final Attributes commonAttributes; + private final Meter meter; + private final ConcurrentHashMap<String, OpenTelemetryMetric> metrics; + + private OpenTelemetryInstrumentation(final State state) { + this.commonAttributes = buildCommonAttributes(state); + this.meter = getOpenTelemetryMetrics(state).getMeter(state.getProp( + ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME)); + this.metrics = new ConcurrentHashMap<>(); + } + + private OpenTelemetryMetricsBase getOpenTelemetryMetrics(State state) { + try { + String openTelemetryClassName = state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME); + Class<?> metricsClass = Class.forName(openTelemetryClassName); + Method getInstanceMethod = metricsClass.getMethod("getInstance", State.class); + return (OpenTelemetryMetricsBase) getInstanceMethod.invoke(null, state); + } catch (Exception e) { + log.error("Failed to initialize OpenTelemetryMetrics through reflection, defaulting to direct instantiation of InMemoryOpenTelemetryMetrics", e); + } + return InMemoryOpenTelemetryMetrics.getInstance(state); + } + + /** + * Returns the singleton instance for the given configuration state. + * + * @param state the configuration containing metric reporting and dimension configs + * @return the global {@link OpenTelemetryInstrumentation} instance + */ + public static OpenTelemetryInstrumentation getInstance(final State state) { + if (GLOBAL_INSTANCE == null) { + synchronized (OpenTelemetryInstrumentation.class) { + if (GLOBAL_INSTANCE == null) { + log.info("Creating OpenTelemetryInstrumentation instance"); + GLOBAL_INSTANCE = new OpenTelemetryInstrumentation(state); + } + } + } + return GLOBAL_INSTANCE; + } + + public static OpenTelemetryInstrumentation getInstance(final Properties props) { + return getInstance(new State(props)); + } + + /** + * Retrieves an existing metric by its enum definition or creates it if absent. + * + * @param metric the {@link GaaSOpenTelemetryMetrics} enum defining name, description, unit, and type {@link OpenTelemetryMetricType} + * @return an {@link OpenTelemetryMetric} instance corresponding to the provided enum + */ + public OpenTelemetryMetric getOrCreate(GaaSOpenTelemetryMetrics metric) { + return this.metrics.computeIfAbsent(metric.getMetricName(), name -> createMetric(metric)); + } Review Comment: the metrics map uses only metricName as key. If two metrics share the same name but differ in type (eg, LONG_COUNTER vs DOUBLE_HISTOGRAM), this can silently cause incorrect caching behavior. It would be better to use a composite key like metricName + "_" + metricType to avoid collisions ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import java.util.Map; + +import lombok.experimental.UtilityClass; +import org.apache.commons.lang3.StringUtils; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; + +/** + * Utility class for OpenTelemetry related operations. + * + * <p>Provides methods to handle OpenTelemetry attributes, including merging multiple + * {@link Attributes} instances and converting maps to {@link Attributes}. + */ +@UtilityClass +public class OpenTelemetryHelper { + + private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-"; + + /** + * Returns the provided attribute value when it is non-null and non-empty; + * otherwise returns the default OpenTelemetry attribute placeholder. + * + * @param value candidate attribute value to check + * @return the original value if not empty, or DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise + */ + public static String getOrDefaultOpenTelemetryAttrValue(String value) { + return StringUtils.isNotEmpty(value) ? value : DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE; + } + + /** + * Merges multiple {@link Attributes} instances into a single {@link Attributes}. + * + * <p>Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are ignored. + * The resulting {@link Attributes} contains all key-value pairs from the + * provided non-null, non-empty inputs in the order they are given. + * + * @param attributesArray array of {@link Attributes} to merge; may contain {@code null} or empty entries + * @return a new {@link Attributes} instance containing all entries from the non-null, + * non-empty inputs; never {@code null} + */ + public static Attributes mergeAttributes(Attributes... attributesArray) { + AttributesBuilder builder = Attributes.builder(); + for (Attributes attrs : attributesArray) { + if (attrs != null && !attrs.isEmpty()) { + builder.putAll(attrs); Review Comment: how are we planning to handle overwrites here? It seems that we intend to have the later attribute array overwrite the earlier value, please add that this to the javadoc. ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetrics.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +@Getter +@AllArgsConstructor +public enum GaaSOpenTelemetryMetrics { + GAAS_JOB_STATUS("gaas_job_status", "Gaas job status counter", "1", OpenTelemetryMetricType.LONG_COUNTER), + GAAS_JOB_STATE_LATENCY("gaas_job_state_latency", "Gaas job state latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM); Review Comment: add javadoc to each enum wrt what the metric tracks.. this would help future contributors understand the usage without needing to trace emission ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GaaSOpenTelemetryMetrics.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +@Getter +@AllArgsConstructor +public enum GaaSOpenTelemetryMetrics { Review Comment: We can move the metric creation logic into the GaaSOpenTelemetryMetrics enum itself instead of centralizing it in OpenTelemetryInstrumentation.createMetric. Currently, the enum acts as a metadata holder, while the actual instantiation logic(via switch-case) is external and hence scattered. This becomes harder to extend if we introduce new metric types(eg gauges, timers, etc). A more extensible approach would be to let each enum constant hold a factory method and expose a createMetric() method ########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.opentelemetry; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; + + +/** + * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry {@link LongCounter}. + * + * <p>This class provides a counter for recording values. + * It supports adding values with optional additional attributes that can be merged with base attributes.</p> + * + */ +@Slf4j +@AllArgsConstructor +public class OpenTelemetryLongCounter implements OpenTelemetryMetric { + private String name; + private Attributes baseAttributes; + private LongCounter longCounter; + + /** + * Adds the specified value to the counter with the base attributes. + * + * @param value the value to add to the counter + */ + public void add(long value) { + log.info("Emitting long counter metric: {}, value: {}, attributes: {}", this.name, value, this.baseAttributes); Review Comment: `log.debug` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org