This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 976409e KAFKA-13137; KRaft Controller Metric MBean names incorrectly
quoted (#11131)
976409e is described below
commit 976409ee081b5a4e5edfe73b5d597998feb2f402
Author: Ron Dagostino <[email protected]>
AuthorDate: Thu Jul 29 16:01:19 2021 -0400
KAFKA-13137; KRaft Controller Metric MBean names incorrectly quoted (#11131)
Controller metric names that are in common between the ZooKeeper-based and
KRaft-based controller must remain the same, but they were not in the AK 2.8
early access release of KRaft. For example, the non-KRaft MBean name
`kafka.controller:type=KafkaController,name=OfflinePartitionsCount` incorrectly
became `"kafka.controller":type="KafkaController",name="OfflinePartitionCount"`
(note the added quotes and the lack of plural). This patch fixes the issues,
closes the test gap that allow [...]
Reviewers: Luke Chen <[email protected]>, Jason Gustafson
<[email protected]>
---
.../apache/kafka/controller/ControllerMetrics.java | 4 +-
.../apache/kafka/controller/QuorumController.java | 1 +
.../kafka/controller/QuorumControllerMetrics.java | 53 ++++++++++----
.../kafka/controller/MockControllerMetrics.java | 11 ++-
.../controller/QuorumControllerMetricsTest.java | 84 ++++++++++++++++++++++
.../kafka/controller/QuorumControllerTest.java | 4 +-
6 files changed, 139 insertions(+), 18 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 7c862bc..3fd0e66 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -18,7 +18,7 @@
package org.apache.kafka.controller;
-public interface ControllerMetrics {
+public interface ControllerMetrics extends AutoCloseable {
void setActive(boolean active);
boolean active();
@@ -42,4 +42,6 @@ public interface ControllerMetrics {
void setPreferredReplicaImbalanceCount(int replicaImbalances);
int preferredReplicaImbalanceCount();
+
+ void close();
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index c8b4721..d62f2f8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1354,6 +1354,7 @@ public final class QuorumController implements Controller
{
@Override
public void close() throws InterruptedException {
queue.close();
+ controllerMetrics.close();
}
// VisibleForTesting
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 52abc8c..3dd3336 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -22,23 +22,26 @@ import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
+import java.util.Arrays;
+import java.util.Objects;
public final class QuorumControllerMetrics implements ControllerMetrics {
- private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName(
- "kafka.controller", "KafkaController", "ActiveControllerCount", null);
- private final static MetricName EVENT_QUEUE_TIME_MS = new MetricName(
- "kafka.controller", "ControllerEventManager", "EventQueueTimeMs",
null);
- private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new
MetricName(
- "kafka.controller", "ControllerEventManager",
"EventQueueProcessingTimeMs", null);
- private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName(
- "kafka.controller", "KafkaController", "GlobalTopicCount", null);
- private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
- "kafka.controller", "KafkaController", "GlobalPartitionCount", null);
- private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName(
- "kafka.controller", "KafkaController", "OfflinePartitionCount", null);
- private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new
MetricName(
- "kafka.controller", "KafkaController",
"PreferredReplicaImbalanceCount", null);
-
+ private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
+ "KafkaController", "ActiveControllerCount");
+ private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
+ "ControllerEventManager", "EventQueueTimeMs");
+ private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS =
getMetricName(
+ "ControllerEventManager", "EventQueueProcessingTimeMs");
+ private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
+ "KafkaController", "GlobalTopicCount");
+ private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
+ "KafkaController", "GlobalPartitionCount");
+ private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
+ "KafkaController", "OfflinePartitionsCount");
+ private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT =
getMetricName(
+ "KafkaController", "PreferredReplicaImbalanceCount");
+
+ private final MetricsRegistry registry;
private volatile boolean active;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
@@ -53,6 +56,7 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
private final Histogram eventQueueProcessingTime;
public QuorumControllerMetrics(MetricsRegistry registry) {
+ this.registry = Objects.requireNonNull(registry);
this.active = false;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
@@ -151,4 +155,23 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
public int preferredReplicaImbalanceCount() {
return this.preferredReplicaImbalanceCount;
}
+
+ @Override
+ public void close() {
+ Arrays.asList(
+ ACTIVE_CONTROLLER_COUNT,
+ EVENT_QUEUE_TIME_MS,
+ EVENT_QUEUE_PROCESSING_TIME_MS,
+ GLOBAL_TOPIC_COUNT,
+ GLOBAL_PARTITION_COUNT,
+ OFFLINE_PARTITION_COUNT,
+
PREFERRED_REPLICA_IMBALANCE_COUNT).forEach(this.registry::removeMetric);
+ }
+
+ private static MetricName getMetricName(String type, String name) {
+ final String group = "kafka.controller";
+ final StringBuilder mbeanNameBuilder = new StringBuilder();
+
mbeanNameBuilder.append(group).append(":type=").append(type).append(",name=").append(name);
+ return new MetricName(group, type, name, null,
mbeanNameBuilder.toString());
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 844475a..3d3075e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -23,7 +23,7 @@ public final class MockControllerMetrics implements
ControllerMetrics {
private volatile int partitions;
private volatile int offlinePartitions;
private volatile int preferredReplicaImbalances;
-
+ private volatile boolean closed = false;
public MockControllerMetrics() {
this.active = false;
@@ -92,4 +92,13 @@ public final class MockControllerMetrics implements
ControllerMetrics {
public int preferredReplicaImbalanceCount() {
return this.preferredReplicaImbalances;
}
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return this.closed;
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
new file mode 100644
index 0000000..74b24c7
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuorumControllerMetricsTest {
+ private static final String EXPECTED_GROUP = "kafka.controller";
+ @Test
+ public void testKafkaControllerMetricNames() {
+ String expectedType = "KafkaController";
+ Set<String> expectedMetricNames = Utils.mkSet(
+ "ActiveControllerCount",
+ "GlobalTopicCount",
+ "GlobalPartitionCount",
+ "OfflinePartitionsCount",
+ "PreferredReplicaImbalanceCount");
+ assertMetricsCreatedAndRemovedUponClose(expectedType,
expectedMetricNames);
+ }
+
+ @Test
+ public void testControllerEventManagerMetricNames() {
+ String expectedType = "ControllerEventManager";
+ Set<String> expectedMetricNames = Utils.mkSet(
+ "EventQueueTimeMs",
+ "EventQueueProcessingTimeMs");
+ assertMetricsCreatedAndRemovedUponClose(expectedType,
expectedMetricNames);
+ }
+
+ private static void assertMetricsCreatedAndRemovedUponClose(String
expectedType, Set<String> expectedMetricNames) {
+ MetricsRegistry registry = new MetricsRegistry();
+ try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry)) {
+ assertMetricsCreated(registry, expectedMetricNames, expectedType);
+ }
+ assertMetricsRemoved(registry, expectedMetricNames, expectedType);
+ }
+
+ private static void assertMetricsCreated(MetricsRegistry registry,
Set<String> expectedMetricNames, String expectedType) {
+ expectedMetricNames.forEach(expectedMetricName -> assertTrue(
+ registry.allMetrics().keySet().stream().anyMatch(metricName -> {
+ if (metricName.getGroup().equals(EXPECTED_GROUP) &&
metricName.getType().equals(expectedType)
+ && metricName.getScope() == null &&
metricName.getName().equals(expectedMetricName)) {
+ // It has to exist AND the MBean name has to be correct;
+ // fail right here if the MBean name doesn't match
+ String expectedMBeanPrefix = EXPECTED_GROUP + ":type=" +
expectedType + ",name=";
+ assertEquals(expectedMBeanPrefix + expectedMetricName,
metricName.getMBeanName(),
+ "Incorrect MBean name");
+ return true; // the metric name exists and the associated
MBean name matches
+ } else {
+ return false; // this one didn't match
+ }
+ }), "Missing metric: " + expectedMetricName));
+ }
+
+ private static void assertMetricsRemoved(MetricsRegistry registry,
Set<String> expectedMetricNames, String expectedType) {
+ expectedMetricNames.forEach(expectedMetricName -> assertTrue(
+ registry.allMetrics().keySet().stream().noneMatch(metricName ->
+ metricName.getGroup().equals(EXPECTED_GROUP) &&
metricName.getType().equals(expectedType)
+ && metricName.getScope() == null &&
metricName.getName().equals(expectedMetricName)),
+ "Metric not removed when closed: " + expectedMetricName));
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ba2d52a..000c380 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,11 +97,13 @@ public class QuorumControllerTest {
*/
@Test
public void testCreateAndClose() throws Throwable {
+ MockControllerMetrics metrics = new MockControllerMetrics();
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
- new QuorumControllerTestEnv(logEnv, __ -> { })) {
+ new QuorumControllerTestEnv(logEnv, builder ->
builder.setMetrics(metrics))) {
}
}
+ assertTrue(metrics.isClosed(), "metrics were not closed");
}
/**