mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1374065471


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return

Review Comment:
   For my own education: Why do we need a `ConcurrentMap`?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param <T> The type of the value.
+ */
+public class LastValueTracker<T> {
+    private final ConcurrentMap<MetricKey, 
AtomicReference<InstantAndValue<T>>> counters = new ConcurrentHashMap<>();
+
+    /**
+     * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+     *
+     * @param metricKey the key for which to calculate a getAndSet.
+     * @param now the timestamp for the new value.
+     * @param value the current value.
+     * @return the timestamp of the previous entry and its value. If there
+     *     isn't a previous entry, then this method returns {@link 
Optional#empty()}
+     */
+    public Optional<InstantAndValue<T>> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+        InstantAndValue<T> instantAndValue = new InstantAndValue<>(now, value);
+        AtomicReference<InstantAndValue<T>> valueOrNull = counters

Review Comment:
   Not sure why we need to wrap `instantAndValue` in an `AtomicReference`. Can 
you elaborate?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * <ol>
+ *     <li>{@link Gauge}</li>
+ *     <li>{@link Measurable}</li>
+ * </ol>
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * <p>
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * <p>
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * <p>
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * <p>
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * <p>
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is composed of Percentile
+ * types instead. So, we should treat the component measurable as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Some metrics are defined as either anonymous inner classes or lambdas 
implementing the Measurable
+ * interface. As we do not have any information on how to treat them, we 
should fallback to treating
+ * them as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * KafkaExporter -> OpenTelemetry mapping for measurables
+ * Avg / Rate / Min / Max / Total / Sum -> Gauge
+ * Count -> Sum
+ * Meter has 2 elements :
+ *  Total -> Sum
+ *  Rate -> Gauge
+ * Frequencies -> each component is Gauge
+ * Percentiles -> each component is Gauge
+ */
+public class KafkaMetricsCollector implements MetricsCollector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaMetricsCollector.class);
+
+    private final StateLedger ledger;
+    private final Clock clock;
+    private final MetricNamingStrategy<MetricName> metricNamingStrategy;
+
+    private static final Field METRIC_VALUE_PROVIDER_FIELD;
+
+    static {
+        try {
+            METRIC_VALUE_PROVIDER_FIELD = 
KafkaMetric.class.getDeclaredField("metricValueProvider");
+            METRIC_VALUE_PROVIDER_FIELD.setAccessible(true);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaMetricsCollector(MetricNamingStrategy<MetricName> 
metricNamingStrategy) {
+        this(metricNamingStrategy, Clock.systemUTC());
+    }
+
+    // Visible for testing
+    public KafkaMetricsCollector(MetricNamingStrategy<MetricName> 
metricNamingStrategy, Clock clock) {
+        this.metricNamingStrategy = metricNamingStrategy;
+        this.clock = clock;
+        this.ledger = new StateLedger();
+    }
+
+    public void init(List<KafkaMetric> metrics) {
+        ledger.init(metrics);
+    }
+
+    /**
+     * This is called whenever a metric is updated or added
+     */
+    public void metricChange(KafkaMetric metric) {
+        ledger.metricChange(metric);
+    }
+
+    /**
+     * This is called whenever a metric is removed
+     */
+    public void metricRemoval(KafkaMetric metric) {
+        ledger.metricRemoval(metric);
+    }
+
+    // Visible for testing
+    Set<MetricKey> getTrackedMetrics() {
+        return ledger.metricMap.keySet();
+    }
+
+    @Override
+    public void collect(MetricsEmitter metricsEmitter) {
+        for (Map.Entry<MetricKey, KafkaMetric> entry : ledger.getMetrics()) {
+            MetricKey metricKey = entry.getKey();
+            KafkaMetric metric = entry.getValue();
+
+            try {
+                collectMetric(metricsEmitter, metricKey, metric);
+            } catch (Exception e) {
+                // catch and log to continue processing remaining metrics
+                log.error("Unexpected error processing Kafka metric {}", 
metricKey, e);
+            }
+        }
+    }
+
+    protected void collectMetric(MetricsEmitter metricsEmitter, MetricKey 
metricKey, KafkaMetric metric) {
+        Object metricValue;
+
+        try {
+            metricValue = metric.metricValue();
+        } catch (Exception e) {
+            // If an exception occurs when retrieving value, log warning and 
continue to process the rest of metrics
+            log.warn("Failed to retrieve metric value {}", 
metricKey.getName(), e);
+            return;
+        }
+
+        if (isMeasurable(metric)) {
+            Measurable measurable = metric.measurable();
+            Double value = (Double) metricValue;
+
+            if (measurable instanceof WindowedCount || measurable instanceof 
CumulativeSum) {
+                collectSum(metricKey, value, metricsEmitter);
+                collectDelta(metricKey, value, metricsEmitter);
+            } else {
+                collectGauge(metricKey, value, metricsEmitter);
+            }
+        } else {
+            // It is non-measurable Gauge metric.
+            // Collect the metric only if its value is a number.
+            if (metricValue instanceof Number) {
+                Number value = (Number) metricValue;
+                collectGauge(metricKey, value, metricsEmitter);
+            } else {
+                // skip non-measurable metrics
+                log.debug("Skipping non-measurable gauge metric {}", 
metricKey.getName());
+            }
+        }
+    }
+
+    private void collectDelta(MetricKey originalKey, Double value, 
MetricsEmitter metricsEmitter) {
+        MetricKey metricKey = 
metricNamingStrategy.derivedMetricKey(originalKey, "delta");
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        // calculate a getAndSet, and add to out if non-empty
+        final Instant timestamp = clock.instant();
+        InstantAndValue<Double> instantAndValue = ledger.delta(originalKey, 
timestamp, value);
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.deltaSum(metricKey, instantAndValue.getValue(), 
true, timestamp,
+                instantAndValue.getIntervalStart())
+        );
+    }
+
+    private void collectSum(MetricKey metricKey, double value, MetricsEmitter 
metricsEmitter) {
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.sum(metricKey, value, true, clock.instant(), 
ledger.instantAdded(metricKey))
+        );
+    }
+
+    private void collectGauge(MetricKey metricKey, Number value, 
MetricsEmitter metricsEmitter) {
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.gauge(metricKey, value, clock.instant())
+        );
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getCanonicalName();
+    }
+
+    private static boolean isMeasurable(KafkaMetric metric) {
+        // KafkaMetric does not expose the internal MetricValueProvider and 
throws an IllegalStateException exception
+        // if .measurable() is called for a Gauge.
+        // There are 2 ways to find the type of internal MetricValueProvider 
for a KafkaMetric - use reflection or
+        // get the information based on whether or not a IllegalStateException 
exception is thrown.
+        // We use reflection so that we can avoid the cost of generating the 
stack trace when it's
+        // not a measurable.
+        try {
+            Object provider = METRIC_VALUE_PROVIDER_FIELD.get(metric);
+            return provider instanceof Measurable;
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Keeps track of the state of metrics, e.g. when they were added, what 
their getAndSet value is,
+     * and clearing them out when they're removed.
+     */
+    private class StateLedger {
+
+        private final Map<MetricKey, KafkaMetric> metricMap = new 
ConcurrentHashMap<>();
+        private final LastValueTracker<Double> doubleDeltas = new 
LastValueTracker<>();
+        private final ConcurrentMap<MetricKey, Instant> metricAdded = new 
ConcurrentHashMap<>();

Review Comment:
   Why do we need a `ConcurrentMap`, and why do we clear the type not just as 
`Map` (similar to `metricMap`)?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+    private final MetricKey key;
+    private final Metric.Builder metricBuilder;
+
+    private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+        this.key = key;
+        this.metricBuilder = metricBuilder;
+    }
+
+    @Override
+    public MetricKey key() {
+        return key;
+    }
+
+    public Metric.Builder builder() {
+        return metricBuilder;
+    }
+
+    public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+        return new SinglePointMetric(metricKey, metric);
+    }
+
+    /*
+        Methods to construct gauge metric type.
+     */
+    public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        return gauge(metricKey, point);
+    }
+
+    /*
+        Methods to construct sum metric type.
+     */
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+        return sum(metricKey, value, monotonic, timestamp, null);
+    }
+
+    public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+        Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value);
+        if (startTimestamp != null) {
+            point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+        }
+
+        return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+    }
+
+    public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+        Instant timestamp, Instant startTimestamp) {
+        NumberDataPoint.Builder point = point(timestamp, value)
+            .setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+        return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+    }
+
+    /*
+        Helper methods to support metric construction.
+     */
+    private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+        boolean monotonic, NumberDataPoint.Builder point) {
+        Objects.requireNonNull(point, "metric number data point cannot be 
null");
+        Objects.requireNonNull(metricKey, "metric key cannot be null");
+
+        point.addAllAttributes(asAttributes(metricKey.tags()));
+
+        Metric.Builder metric = 
Metric.newBuilder().setName(metricKey.getName());
+        metric
+            .getSumBuilder()
+            .setAggregationTemporality(aggregationTemporality)
+            .setIsMonotonic(monotonic)
+            .addDataPoints(point);
+        return create(metricKey, metric);
+    }
+
+    private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+        Objects.requireNonNull(point, "metric number data point cannot be 
null");

Review Comment:
   Do we need this check?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+    private final MetricKey key;
+    private final Metric.Builder metricBuilder;
+
+    private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+        this.key = key;
+        this.metricBuilder = metricBuilder;
+    }
+
+    @Override
+    public MetricKey key() {
+        return key;
+    }
+
+    public Metric.Builder builder() {
+        return metricBuilder;
+    }
+
+    public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {

Review Comment:
   Why do we add a `create` -- seems there is no validation happening (ie no 
additional code) -- seems easier to just make the constructor public?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * <ol>
+ *     <li>{@link Gauge}</li>
+ *     <li>{@link Measurable}</li>
+ * </ol>
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * <p>
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * <p>
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * <p>
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * <p>
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * <p>
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is composed of Percentile
+ * types instead. So, we should treat the component measurable as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Some metrics are defined as either anonymous inner classes or lambdas 
implementing the Measurable
+ * interface. As we do not have any information on how to treat them, we 
should fallback to treating
+ * them as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * KafkaExporter -> OpenTelemetry mapping for measurables
+ * Avg / Rate / Min / Max / Total / Sum -> Gauge
+ * Count -> Sum
+ * Meter has 2 elements :
+ *  Total -> Sum
+ *  Rate -> Gauge
+ * Frequencies -> each component is Gauge
+ * Percentiles -> each component is Gauge
+ */
+public class KafkaMetricsCollector implements MetricsCollector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaMetricsCollector.class);
+
+    private final StateLedger ledger;
+    private final Clock clock;
+    private final MetricNamingStrategy<MetricName> metricNamingStrategy;
+
+    private static final Field METRIC_VALUE_PROVIDER_FIELD;
+
+    static {
+        try {
+            METRIC_VALUE_PROVIDER_FIELD = 
KafkaMetric.class.getDeclaredField("metricValueProvider");
+            METRIC_VALUE_PROVIDER_FIELD.setAccessible(true);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaMetricsCollector(MetricNamingStrategy<MetricName> 
metricNamingStrategy) {
+        this(metricNamingStrategy, Clock.systemUTC());
+    }
+
+    // Visible for testing
+    public KafkaMetricsCollector(MetricNamingStrategy<MetricName> 
metricNamingStrategy, Clock clock) {
+        this.metricNamingStrategy = metricNamingStrategy;
+        this.clock = clock;
+        this.ledger = new StateLedger();
+    }
+
+    public void init(List<KafkaMetric> metrics) {
+        ledger.init(metrics);
+    }
+
+    /**
+     * This is called whenever a metric is updated or added
+     */
+    public void metricChange(KafkaMetric metric) {
+        ledger.metricChange(metric);
+    }
+
+    /**
+     * This is called whenever a metric is removed
+     */
+    public void metricRemoval(KafkaMetric metric) {
+        ledger.metricRemoval(metric);
+    }
+
+    // Visible for testing
+    Set<MetricKey> getTrackedMetrics() {
+        return ledger.metricMap.keySet();
+    }
+
+    @Override
+    public void collect(MetricsEmitter metricsEmitter) {
+        for (Map.Entry<MetricKey, KafkaMetric> entry : ledger.getMetrics()) {
+            MetricKey metricKey = entry.getKey();
+            KafkaMetric metric = entry.getValue();
+
+            try {
+                collectMetric(metricsEmitter, metricKey, metric);
+            } catch (Exception e) {
+                // catch and log to continue processing remaining metrics
+                log.error("Unexpected error processing Kafka metric {}", 
metricKey, e);
+            }
+        }
+    }
+
+    protected void collectMetric(MetricsEmitter metricsEmitter, MetricKey 
metricKey, KafkaMetric metric) {
+        Object metricValue;
+
+        try {
+            metricValue = metric.metricValue();
+        } catch (Exception e) {
+            // If an exception occurs when retrieving value, log warning and 
continue to process the rest of metrics
+            log.warn("Failed to retrieve metric value {}", 
metricKey.getName(), e);
+            return;
+        }
+
+        if (isMeasurable(metric)) {
+            Measurable measurable = metric.measurable();
+            Double value = (Double) metricValue;
+
+            if (measurable instanceof WindowedCount || measurable instanceof 
CumulativeSum) {
+                collectSum(metricKey, value, metricsEmitter);
+                collectDelta(metricKey, value, metricsEmitter);
+            } else {
+                collectGauge(metricKey, value, metricsEmitter);
+            }
+        } else {
+            // It is non-measurable Gauge metric.
+            // Collect the metric only if its value is a number.
+            if (metricValue instanceof Number) {
+                Number value = (Number) metricValue;
+                collectGauge(metricKey, value, metricsEmitter);
+            } else {
+                // skip non-measurable metrics
+                log.debug("Skipping non-measurable gauge metric {}", 
metricKey.getName());
+            }
+        }
+    }
+
+    private void collectDelta(MetricKey originalKey, Double value, 
MetricsEmitter metricsEmitter) {
+        MetricKey metricKey = 
metricNamingStrategy.derivedMetricKey(originalKey, "delta");
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        // calculate a getAndSet, and add to out if non-empty
+        final Instant timestamp = clock.instant();
+        InstantAndValue<Double> instantAndValue = ledger.delta(originalKey, 
timestamp, value);
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.deltaSum(metricKey, instantAndValue.getValue(), 
true, timestamp,
+                instantAndValue.getIntervalStart())
+        );
+    }
+
+    private void collectSum(MetricKey metricKey, double value, MetricsEmitter 
metricsEmitter) {
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.sum(metricKey, value, true, clock.instant(), 
ledger.instantAdded(metricKey))
+        );
+    }
+
+    private void collectGauge(MetricKey metricKey, Number value, 
MetricsEmitter metricsEmitter) {
+        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
+            return;
+        }
+
+        metricsEmitter.emitMetric(
+            SinglePointMetric.gauge(metricKey, value, clock.instant())
+        );
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getCanonicalName();

Review Comment:
   Any particular reason why we overwrite `toString()` -- seems not to expose 
any internal information, so no sure what the purpose is?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * <ol>
+ *     <li>{@link Gauge}</li>
+ *     <li>{@link Measurable}</li>
+ * </ol>
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * <p>
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * <p>
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * <p>
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * <p>
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * <p>
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is composed of Percentile
+ * types instead. So, we should treat the component measurable as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * Some metrics are defined as either anonymous inner classes or lambdas 
implementing the Measurable
+ * interface. As we do not have any information on how to treat them, we 
should fallback to treating
+ * them as GAUGE_DOUBLE.
+ *
+ * <p>
+ *
+ * KafkaExporter -> OpenTelemetry mapping for measurables
+ * Avg / Rate / Min / Max / Total / Sum -> Gauge
+ * Count -> Sum
+ * Meter has 2 elements :
+ *  Total -> Sum
+ *  Rate -> Gauge
+ * Frequencies -> each component is Gauge
+ * Percentiles -> each component is Gauge
+ */
+public class KafkaMetricsCollector implements MetricsCollector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaMetricsCollector.class);
+
+    private final StateLedger ledger;
+    private final Clock clock;
+    private final MetricNamingStrategy<MetricName> metricNamingStrategy;
+
+    private static final Field METRIC_VALUE_PROVIDER_FIELD;
+
+    static {
+        try {
+            METRIC_VALUE_PROVIDER_FIELD = 
KafkaMetric.class.getDeclaredField("metricValueProvider");
+            METRIC_VALUE_PROVIDER_FIELD.setAccessible(true);

Review Comment:
   Not a fan on generic (usually indicate that the structure of the code is not 
well setup) -- why do we need to do this? Can't we find a better way?
   
   I don't see a strict reason why `KafkaMetric` must be `final` so why no 
relax this, make `metricValueProvider` protected, and add a new class 
`KafkaMetricsInternal` and add a method `isMeasurable(KafkaMetric metric)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to