This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4549ca8  KAFKA-13137; KRaft Controller Metric MBean names incorrectly 
quoted (#11131)
4549ca8 is described below

commit 4549ca8fc5450549d0021eef2b9227c3cb8dcc24
Author: Ron Dagostino <[email protected]>
AuthorDate: Thu Jul 29 16:01:19 2021 -0400

    KAFKA-13137; KRaft Controller Metric MBean names incorrectly quoted (#11131)
    
    Controller metric names that are in common between the ZooKeeper-based and 
KRaft-based controller must remain the same, but they were not in the AK 2.8 
early access release of KRaft. For example, the non-KRaft MBean name 
`kafka.controller:type=KafkaController,name=OfflinePartitionsCount` incorrectly 
became `"kafka.controller":type="KafkaController",name="OfflinePartitionCount"` 
(note the added quotes and the lack of plural).  This patch fixes the issues, 
closes the test gap that allow [...]
    
    Reviewers: Luke Chen <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../apache/kafka/controller/ControllerMetrics.java |  4 +-
 .../apache/kafka/controller/QuorumController.java  |  1 +
 .../kafka/controller/QuorumControllerMetrics.java  | 53 ++++++++++----
 .../kafka/controller/MockControllerMetrics.java    | 11 ++-
 .../controller/QuorumControllerMetricsTest.java    | 84 ++++++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     |  4 +-
 6 files changed, 139 insertions(+), 18 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 7c862bc..3fd0e66 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.controller;
 
 
-public interface ControllerMetrics {
+public interface ControllerMetrics extends AutoCloseable {
     void setActive(boolean active);
 
     boolean active();
@@ -42,4 +42,6 @@ public interface ControllerMetrics {
     void setPreferredReplicaImbalanceCount(int replicaImbalances);
 
     int preferredReplicaImbalanceCount();
+
+    void close();
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index c8b4721..d62f2f8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1354,6 +1354,7 @@ public final class QuorumController implements Controller 
{
     @Override
     public void close() throws InterruptedException {
         queue.close();
+        controllerMetrics.close();
     }
 
     // VisibleForTesting
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 52abc8c..3dd3336 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -22,23 +22,26 @@ import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
 
+import java.util.Arrays;
+import java.util.Objects;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
-    private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName(
-        "kafka.controller", "KafkaController", "ActiveControllerCount", null);
-    private final static MetricName EVENT_QUEUE_TIME_MS = new MetricName(
-        "kafka.controller", "ControllerEventManager", "EventQueueTimeMs", 
null);
-    private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new 
MetricName(
-        "kafka.controller", "ControllerEventManager", 
"EventQueueProcessingTimeMs", null);
-    private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName(
-        "kafka.controller", "KafkaController", "GlobalTopicCount", null);
-    private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
-        "kafka.controller", "KafkaController", "GlobalPartitionCount", null);
-    private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName(
-        "kafka.controller", "KafkaController", "OfflinePartitionCount", null);
-    private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new 
MetricName(
-        "kafka.controller", "KafkaController", 
"PreferredReplicaImbalanceCount", null);
-    
+    private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
+        "KafkaController", "ActiveControllerCount");
+    private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
+        "ControllerEventManager", "EventQueueTimeMs");
+    private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
+        "ControllerEventManager", "EventQueueProcessingTimeMs");
+    private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
+        "KafkaController", "GlobalTopicCount");
+    private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
+        "KafkaController", "GlobalPartitionCount");
+    private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
+        "KafkaController", "OfflinePartitionsCount");
+    private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = 
getMetricName(
+        "KafkaController", "PreferredReplicaImbalanceCount");
+
+    private final MetricsRegistry registry;
     private volatile boolean active;
     private volatile int globalTopicCount;
     private volatile int globalPartitionCount;
@@ -53,6 +56,7 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     private final Histogram eventQueueProcessingTime;
 
     public QuorumControllerMetrics(MetricsRegistry registry) {
+        this.registry = Objects.requireNonNull(registry);
         this.active = false;
         this.globalTopicCount = 0;
         this.globalPartitionCount = 0;
@@ -151,4 +155,23 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     public int preferredReplicaImbalanceCount() {
         return this.preferredReplicaImbalanceCount;
     }
+
+    @Override
+    public void close() {
+        Arrays.asList(
+            ACTIVE_CONTROLLER_COUNT,
+            EVENT_QUEUE_TIME_MS,
+            EVENT_QUEUE_PROCESSING_TIME_MS,
+            GLOBAL_TOPIC_COUNT,
+            GLOBAL_PARTITION_COUNT,
+            OFFLINE_PARTITION_COUNT,
+            
PREFERRED_REPLICA_IMBALANCE_COUNT).forEach(this.registry::removeMetric);
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        final String group = "kafka.controller";
+        final StringBuilder mbeanNameBuilder = new StringBuilder();
+        
mbeanNameBuilder.append(group).append(":type=").append(type).append(",name=").append(name);
+        return new MetricName(group, type, name, null, 
mbeanNameBuilder.toString());
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 844475a..3d3075e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -23,7 +23,7 @@ public final class MockControllerMetrics implements 
ControllerMetrics {
     private volatile int partitions;
     private volatile int offlinePartitions;
     private volatile int preferredReplicaImbalances;
-
+    private volatile boolean closed = false;
 
     public MockControllerMetrics() {
         this.active = false;
@@ -92,4 +92,13 @@ public final class MockControllerMetrics implements 
ControllerMetrics {
     public int preferredReplicaImbalanceCount() {
         return this.preferredReplicaImbalances;
     }
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    public boolean isClosed() {
+        return this.closed;
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
new file mode 100644
index 0000000..74b24c7
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuorumControllerMetricsTest {
+    private static final String EXPECTED_GROUP = "kafka.controller";
+    @Test
+    public void testKafkaControllerMetricNames() {
+        String expectedType = "KafkaController";
+        Set<String> expectedMetricNames = Utils.mkSet(
+            "ActiveControllerCount",
+            "GlobalTopicCount",
+            "GlobalPartitionCount",
+            "OfflinePartitionsCount",
+            "PreferredReplicaImbalanceCount");
+        assertMetricsCreatedAndRemovedUponClose(expectedType, 
expectedMetricNames);
+    }
+
+    @Test
+    public void testControllerEventManagerMetricNames() {
+        String expectedType = "ControllerEventManager";
+        Set<String> expectedMetricNames = Utils.mkSet(
+            "EventQueueTimeMs",
+            "EventQueueProcessingTimeMs");
+        assertMetricsCreatedAndRemovedUponClose(expectedType, 
expectedMetricNames);
+    }
+
+    private static void assertMetricsCreatedAndRemovedUponClose(String 
expectedType, Set<String> expectedMetricNames) {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry)) {
+            assertMetricsCreated(registry, expectedMetricNames, expectedType);
+        }
+        assertMetricsRemoved(registry, expectedMetricNames, expectedType);
+    }
+
+    private static void assertMetricsCreated(MetricsRegistry registry, 
Set<String> expectedMetricNames, String expectedType) {
+        expectedMetricNames.forEach(expectedMetricName -> assertTrue(
+            registry.allMetrics().keySet().stream().anyMatch(metricName -> {
+                if (metricName.getGroup().equals(EXPECTED_GROUP) && 
metricName.getType().equals(expectedType)
+                    && metricName.getScope() == null && 
metricName.getName().equals(expectedMetricName)) {
+                    // It has to exist AND the MBean name has to be correct;
+                    // fail right here if the MBean name doesn't match
+                    String expectedMBeanPrefix = EXPECTED_GROUP + ":type=" + 
expectedType + ",name=";
+                    assertEquals(expectedMBeanPrefix + expectedMetricName, 
metricName.getMBeanName(),
+                        "Incorrect MBean name");
+                    return true; // the metric name exists and the associated 
MBean name matches
+                } else {
+                    return false; // this one didn't match
+                }
+            }), "Missing metric: " + expectedMetricName));
+    }
+
+    private static void assertMetricsRemoved(MetricsRegistry registry, 
Set<String> expectedMetricNames, String expectedType) {
+        expectedMetricNames.forEach(expectedMetricName -> assertTrue(
+            registry.allMetrics().keySet().stream().noneMatch(metricName ->
+                metricName.getGroup().equals(EXPECTED_GROUP) && 
metricName.getType().equals(expectedType)
+                    && metricName.getScope() == null && 
metricName.getName().equals(expectedMetricName)),
+            "Metric not removed when closed: " + expectedMetricName));
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ba2d52a..000c380 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,11 +97,13 @@ public class QuorumControllerTest {
      */
     @Test
     public void testCreateAndClose() throws Throwable {
+        MockControllerMetrics metrics = new MockControllerMetrics();
         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
             try (QuorumControllerTestEnv controlEnv =
-                     new QuorumControllerTestEnv(logEnv, __ -> { })) {
+                     new QuorumControllerTestEnv(logEnv, builder -> 
builder.setMetrics(metrics))) {
             }
         }
+        assertTrue(metrics.isClosed(), "metrics were not closed");
     }
 
     /**

Reply via email to