This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 39a555ba94 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287) 39a555ba94 is described below commit 39a555ba94a6a5d851b31e0a7f07e19c48327835 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Jun 14 16:04:26 2022 -0700 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287) Use the newly added function to replace the old addMetric function that may throw illegal argument exceptions. Although in some cases concurrency should not be possible they do not necessarily remain always true in the future, so it's better to use the new API just to be less error-prone. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 11 +++++------ .../java/org/apache/kafka/connect/runtime/WorkerTask.java | 5 ++--- .../processor/internals/metrics/StreamsMetricsImpl.java | 3 +-- .../processor/internals/metrics/StreamsMetricsImplTest.java | 2 +- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index fa7073cf0d..73ffd217ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1866,12 +1866,11 @@ public class Fetcher<K, V> implements Closeable { for (TopicPartition tp : newAssignedPartitions) { if (!this.assignedPartitions.contains(tp)) { MetricName metricName = partitionPreferredReadReplicaMetricName(tp); - if (metrics.metric(metricName) == null) { - metrics.addMetric( - metricName, - (Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) - ); - } + metrics.addMetricIfAbsent( + metricName, + null, + (Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) + ); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 072e4b34a1..f7c819fb4a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Frequencies; @@ -377,10 +378,8 @@ abstract class WorkerTask implements Runnable { private void addRatioMetric(final State matchingState, MetricNameTemplate template) { MetricName metricName = metricGroup.metricName(template); - if (metricGroup.metrics().metric(metricName) == null) { - metricGroup.metrics().addMetric(metricName, (config, now) -> + metricGroup.metrics().addMetricIfAbsent(metricName, null, (Gauge<Double>) (config, now) -> taskStateTimer.durationRatio(matchingState, now)); - } } void close() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 4bfd96265b..cb77fc9bdd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -502,9 +502,8 @@ public class StreamsMetricsImpl implements StreamsMetrics { storeLevelTagMap(taskId, metricsScope, storeName) ); if (metrics.metric(metricName) == null) { - final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); + metrics.addMetricIfAbsent(metricName, new MetricConfig().recordLevel(recordingLevel), valueProvider); final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); - metrics.addMetric(metricName, metricConfig, valueProvider); storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 5ec834a11f..176966f827 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -483,7 +483,7 @@ public class StreamsMetricsImplTest { expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) .andReturn(metricName); expect(metrics.metric(metricName)).andReturn(null); - metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER)); + expect(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).andReturn(null); replay(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);