This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method 
(#12121)
5cab11cf52 is described below

commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb
Author: vamossagar12 <sagarmeansoc...@gmail.com>
AuthorDate: Mon Jun 13 23:06:39 2022 +0530

    KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
    
    Reviewers: David Jacot <dja...@confluent.io>, Justine Olshan 
<jols...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 .../org/apache/kafka/common/metrics/Metrics.java   | 40 +++++++++++++++++--
 .../org/apache/kafka/common/metrics/Sensor.java    | 10 ++++-
 .../kafka/connect/runtime/ConnectMetrics.java      |  8 +---
 .../internals/metrics/StreamsMetricsImplTest.java  | 46 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 52b7794a4c..398819016c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -509,7 +509,10 @@ public class Metrics implements Closeable {
                                         
Objects.requireNonNull(metricValueProvider),
                                         config == null ? this.config : config,
                                         time);
-        registerMetric(m);
+        KafkaMetric existingMetric = registerMetric(m);
+        if (existingMetric != null) {
+            throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+        }
     }
 
     /**
@@ -524,6 +527,26 @@ public class Metrics implements Closeable {
         addMetric(metricName, null, metricValueProvider);
     }
 
+    /**
+     * Create or get an existing metric to monitor an object that implements 
MetricValueProvider.
+     * This metric won't be associated with any sensor. This is a way to 
expose existing values as metrics.
+     * This method takes care of synchronisation while updating/accessing 
metrics by concurrent threads.
+     *
+     * @param metricName The name of the metric
+     * @param metricValueProvider The metric value provider associated with 
this metric
+     * @return Existing KafkaMetric if already registered or else a newly 
created one
+     */
+    public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig 
config, MetricValueProvider<?> metricValueProvider) {
+        KafkaMetric metric = new KafkaMetric(new Object(),
+                Objects.requireNonNull(metricName),
+                Objects.requireNonNull(metricValueProvider),
+                config == null ? this.config : config,
+                time);
+
+        KafkaMetric existingMetric = registerMetric(metric);
+        return existingMetric == null ? metric : existingMetric;
+    }
+
     /**
      * Remove a metric if it exists and return it. Return null otherwise. If a 
metric is removed, `metricRemoval`
      * will be invoked for each reporter.
@@ -563,10 +586,18 @@ public class Metrics implements Closeable {
         }
     }
 
-    synchronized void registerMetric(KafkaMetric metric) {
+    /**
+     * Register a metric if not present or return an already existing metric 
otherwise.
+     * When a metric is newly registered, this method returns null
+     *
+     * @param metric The KafkaMetric to register
+     * @return KafkaMetric if the metric already exists, null otherwise
+     */
+    synchronized KafkaMetric registerMetric(KafkaMetric metric) {
         MetricName metricName = metric.metricName();
-        if (this.metrics.containsKey(metricName))
-            throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+        if (this.metrics.containsKey(metricName)) {
+            return this.metrics.get(metricName);
+        }
         this.metrics.put(metricName, metric);
         for (MetricsReporter reporter : reporters) {
             try {
@@ -576,6 +607,7 @@ public class Metrics implements Closeable {
             }
         }
         log.trace("Registered metric named {}", metricName);
+        return null;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ae3b8d997..25f3c21a31 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -297,7 +297,10 @@ public final class Sensor {
         for (NamedMeasurable m : stat.stats()) {
             final KafkaMetric metric = new KafkaMetric(lock, m.name(), 
m.stat(), statConfig, time);
             if (!metrics.containsKey(metric.metricName())) {
-                registry.registerMetric(metric);
+                KafkaMetric existingMetric = registry.registerMetric(metric);
+                if (existingMetric != null) {
+                    throw new IllegalArgumentException("A metric named '" + 
metric.metricName() + "' already exists, can't register another one.");
+                }
                 metrics.put(metric.metricName(), metric);
             }
         }
@@ -336,7 +339,10 @@ public final class Sensor {
                 statConfig,
                 time
             );
-            registry.registerMetric(metric);
+            KafkaMetric existingMetric = registry.registerMetric(metric);
+            if (existingMetric != null) {
+                throw new IllegalArgumentException("A metric named '" + 
metricName + "' already exists, can't register another one.");
+            }
             metrics.put(metric.metricName(), metric);
             stats.add(new StatAndConfig(Objects.requireNonNull(stat), 
metric::config));
             return true;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 7dad6aec0a..ed81be657a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -319,9 +319,7 @@ public class ConnectMetrics {
          */
         public <T> void addValueMetric(MetricNameTemplate nameTemplate, final 
LiteralSupplier<T> supplier) {
             MetricName metricName = metricName(nameTemplate);
-            if (metrics().metric(metricName) == null) {
-                metrics().addMetric(metricName, (Gauge<T>) (config, now) -> 
supplier.metricValue(now));
-            }
+            metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, 
now) -> supplier.metricValue(now));
         }
 
         /**
@@ -333,9 +331,7 @@ public class ConnectMetrics {
          */
         public <T> void addImmutableValueMetric(MetricNameTemplate 
nameTemplate, final T value) {
             MetricName metricName = metricName(nameTemplate);
-            if (metrics().metric(metricName) == null) {
-                metrics().addMetric(metricName, (Gauge<T>) (config, now) -> 
value);
-            }
+            metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, 
now) -> value);
         }
 
         /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index b8d3d92e62..5ec834a11f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -84,6 +85,8 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.powermock.api.easymock.PowerMock.createMock;
 
 @RunWith(PowerMockRunner.class)
@@ -497,6 +500,17 @@ public class StreamsMetricsImplTest {
         verify(metrics);
     }
 
+    @Test
+    public void shouldCreateNewStoreLevelMutableMetric() {
+        final MetricName metricName =
+                new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+        final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+        final Metrics metrics = new Metrics(metricConfig);
+        assertNull(metrics.metric(metricName));
+        metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
+        assertNotNull(metrics.metric(metricName));
+    }
+
     @Test
     public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
         final Metrics metrics = mock(Metrics.class);
@@ -521,6 +535,38 @@ public class StreamsMetricsImplTest {
         verify(metrics);
     }
 
+    @Test
+    public void shouldReturnSameMetricIfAlreadyCreated() {
+        final MetricName metricName =
+                new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+        final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+        final Metrics metrics = new Metrics(metricConfig);
+        assertNull(metrics.metric(metricName));
+        final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, 
metricConfig, VALUE_PROVIDER);
+        assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, 
metricConfig, VALUE_PROVIDER));
+    }
+
+    @Test
+    public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() 
throws InterruptedException {
+        final MetricName metricName =
+                new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+        final MetricConfig metricConfig = new 
MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+        final Metrics metrics = new Metrics(metricConfig);
+        assertNull(metrics.metric(metricName));
+        final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new 
AtomicReference<>();
+        final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new 
AtomicReference<>();
+
+        final Thread thread1 = new Thread(() -> 
metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
+        final Thread thread2 = new Thread(() -> 
metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, 
VALUE_PROVIDER)));
+
+        thread1.start();
+        thread2.start();
+
+        thread1.join();
+        thread2.join();
+        assertEquals(metricCreatedViaThread1.get(), 
metricCreatedViaThread2.get());
+    }
+
     @Test
     public void shouldRemoveStateStoreLevelSensors() {
         final Metrics metrics = niceMock(Metrics.class);

Reply via email to