This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39a555ba94 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)
39a555ba94 is described below

commit 39a555ba94a6a5d851b31e0a7f07e19c48327835
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Jun 14 16:04:26 2022 -0700

    KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)
    
    Use the newly added function to replace the old addMetric function that may 
throw illegal argument exceptions.
    
    Although in some cases concurrency should not be possible they do not 
necessarily remain always true in the future, so it's better to use the new API 
just to be less error-prone.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../org/apache/kafka/clients/consumer/internals/Fetcher.java  | 11 +++++------
 .../java/org/apache/kafka/connect/runtime/WorkerTask.java     |  5 ++---
 .../processor/internals/metrics/StreamsMetricsImpl.java       |  3 +--
 .../processor/internals/metrics/StreamsMetricsImplTest.java   |  2 +-
 4 files changed, 9 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fa7073cf0d..73ffd217ef 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1866,12 +1866,11 @@ public class Fetcher<K, V> implements Closeable {
                 for (TopicPartition tp : newAssignedPartitions) {
                     if (!this.assignedPartitions.contains(tp)) {
                         MetricName metricName = 
partitionPreferredReadReplicaMetricName(tp);
-                        if (metrics.metric(metricName) == null) {
-                            metrics.addMetric(
-                                metricName,
-                                (Gauge<Integer>) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
-                            );
-                        }
+                        metrics.addMetricIfAbsent(
+                            metricName,
+                            null,
+                            (Gauge<Integer>) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
+                        );
                     }
                 }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 072e4b34a1..f7c819fb4a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Frequencies;
@@ -377,10 +378,8 @@ abstract class WorkerTask implements Runnable {
 
         private void addRatioMetric(final State matchingState, 
MetricNameTemplate template) {
             MetricName metricName = metricGroup.metricName(template);
-            if (metricGroup.metrics().metric(metricName) == null) {
-                metricGroup.metrics().addMetric(metricName, (config, now) ->
+            metricGroup.metrics().addMetricIfAbsent(metricName, null, 
(Gauge<Double>) (config, now) ->
                     taskStateTimer.durationRatio(matchingState, now));
-            }
         }
 
         void close() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 4bfd96265b..cb77fc9bdd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -502,9 +502,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
             storeLevelTagMap(taskId, metricsScope, storeName)
         );
         if (metrics.metric(metricName) == null) {
-            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            metrics.addMetricIfAbsent(metricName, new 
MetricConfig().recordLevel(recordingLevel), valueProvider);
             final String key = 
storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
-            metrics.addMetric(metricName, metricConfig, valueProvider);
             storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 5ec834a11f..176966f827 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -483,7 +483,7 @@ public class StreamsMetricsImplTest {
         expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .andReturn(metricName);
         expect(metrics.metric(metricName)).andReturn(null);
-        metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), 
eq(VALUE_PROVIDER));
+        expect(metrics.addMetricIfAbsent(eq(metricName), 
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).andReturn(null);
         replay(metrics);
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
 

Reply via email to