This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new e6b543a KAFKA-10570; Rename JMXReporter configs for KIP-629
e6b543a is described below
commit e6b543a44a3989a190011868b4f8c6ec56e78aa9
Author: Xavier Léauté <[email protected]>
AuthorDate: Tue Oct 13 12:33:05 2020 -0700
KAFKA-10570; Rename JMXReporter configs for KIP-629
* rename whitelist/blacklist to include/exclude
* add utility methods to translate deprecated configs
Author: Xavier Léauté <[email protected]>
Reviewers: Gwen Shapira
Closes #9367 from xvrl/kafka-10570
(cherry picked from commit f46d4f4fce341326c06c0aa8b2d0d64982573658)
Signed-off-by: Gwen Shapira <[email protected]>
---
.../apache/kafka/common/metrics/JmxReporter.java | 46 ++++---
.../org/apache/kafka/common/utils/ConfigUtils.java | 116 +++++++++++++++++
.../kafka/common/metrics/JmxReporterTest.java | 4 +-
.../apache/kafka/common/utils/ConfigUtilsTest.java | 143 +++++++++++++++++++++
.../scala/unit/kafka/metrics/MetricsTest.scala | 4 +-
5 files changed, 291 insertions(+), 22 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 73522a9..3867091 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.common.utils.Sanitizer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -51,14 +52,20 @@ public class JmxReporter implements MetricsReporter {
public static final String METRICS_CONFIG_PREFIX = "metrics.jmx.";
- public static final String BLACKLIST_CONFIG = METRICS_CONFIG_PREFIX +
"blacklist";
- public static final String WHITELIST_CONFIG = METRICS_CONFIG_PREFIX +
"whitelist";
+ public static final String EXCLUDE_CONFIG = METRICS_CONFIG_PREFIX +
"exclude";
+ public static final String EXCLUDE_CONFIG_ALIAS = METRICS_CONFIG_PREFIX +
"blacklist";
- public static final Set<String> RECONFIGURABLE_CONFIGS =
Utils.mkSet(WHITELIST_CONFIG,
-
BLACKLIST_CONFIG);
+ public static final String INCLUDE_CONFIG = METRICS_CONFIG_PREFIX +
"include";
+ public static final String INCLUDE_CONFIG_ALIAS = METRICS_CONFIG_PREFIX +
"whitelist";
- public static final String DEFAULT_WHITELIST = ".*";
- public static final String DEFAULT_BLACKLIST = "";
+
+ public static final Set<String> RECONFIGURABLE_CONFIGS =
Utils.mkSet(INCLUDE_CONFIG,
+
INCLUDE_CONFIG_ALIAS,
+
EXCLUDE_CONFIG,
+
EXCLUDE_CONFIG_ALIAS);
+
+ public static final String DEFAULT_INCLUDE = ".*";
+ public static final String DEFAULT_EXCLUDE = "";
private static final Logger log =
LoggerFactory.getLogger(JmxReporter.class);
private static final Object LOCK = new Object();
@@ -300,27 +307,30 @@ public class JmxReporter implements MetricsReporter {
}
- public static Predicate<String> compilePredicate(Map<String, ?> configs) {
- String whitelist = (String) configs.get(WHITELIST_CONFIG);
- String blacklist = (String) configs.get(BLACKLIST_CONFIG);
+ public static Predicate<String> compilePredicate(Map<String, ?>
originalConfig) {
+ Map<String, ?> configs = ConfigUtils.translateDeprecatedConfigs(
+ originalConfig, new String[][]{{INCLUDE_CONFIG,
INCLUDE_CONFIG_ALIAS},
+ {EXCLUDE_CONFIG,
EXCLUDE_CONFIG_ALIAS}});
+ String include = (String) configs.get(INCLUDE_CONFIG);
+ String exclude = (String) configs.get(EXCLUDE_CONFIG);
- if (whitelist == null) {
- whitelist = DEFAULT_WHITELIST;
+ if (include == null) {
+ include = DEFAULT_INCLUDE;
}
- if (blacklist == null) {
- blacklist = DEFAULT_BLACKLIST;
+ if (exclude == null) {
+ exclude = DEFAULT_EXCLUDE;
}
try {
- Pattern whitelistPattern = Pattern.compile(whitelist);
- Pattern blacklistPattern = Pattern.compile(blacklist);
+ Pattern includePattern = Pattern.compile(include);
+ Pattern excludePattern = Pattern.compile(exclude);
- return s -> whitelistPattern.matcher(s).matches()
- && !blacklistPattern.matcher(s).matches();
+ return s -> includePattern.matcher(s).matches()
+ && !excludePattern.matcher(s).matches();
} catch (PatternSyntaxException e) {
throw new ConfigException("JMX filter for configuration" +
METRICS_CONFIG_PREFIX
- + ".(whitelist/blacklist) is not a valid
regular expression");
+ + ".(include/exclude) is not a valid
regular expression");
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
new file mode 100644
index 0000000..504a1f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConfigUtils {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConfigUtils.class);
+
+ /**
+ * Translates deprecated configurations into their non-deprecated
equivalents
+ *
+ * This is a convenience method for {@link
ConfigUtils#translateDeprecatedConfigs(Map, Map)}
+ * until we can use Java 9+ {@code Map.of(..)} and {@code Set.of(...)}
+ *
+ * @param configs the input configuration
+ * @param aliasGroups An array of arrays of synonyms. Each synonym array
begins with the non-deprecated synonym
+ * For example, new String[][] { { a, b }, { c, d, e} }
+ * would declare b as a deprecated synonym for a,
+ * and d and e as deprecated synonyms for c.
+ * The ordering of synonyms determines the order of
precedence
+ * (e.g. the first synonym takes precedence over the
second one)
+ * @return a new configuration map with deprecated keys translated to
their non-deprecated equivalents
+ */
+ public static <T> Map<String, T> translateDeprecatedConfigs(Map<String, T>
configs, String[][] aliasGroups) {
+ return translateDeprecatedConfigs(configs, Stream.of(aliasGroups)
+ .collect(Collectors.toMap(x -> x[0], x ->
Stream.of(x).skip(1).collect(Collectors.toList()))));
+ }
+
+ /**
+ * Translates deprecated configurations into their non-deprecated
equivalents
+ *
+ * @param configs the input configuration
+ * @param aliasGroups A map of config to synonyms. Each key is the
non-deprecated synonym
+ * For example, Map.of(a , Set.of(b), c, Set.of(d, e))
+ * would declare b as a deprecated synonym for a,
+ * and d and e as deprecated synonyms for c.
+ * The ordering of synonyms determines the order of
precedence
+ * (e.g. the first synonym takes precedence over the
second one)
+ * @return a new configuration map with deprecated keys translated to
their non-deprecated equivalents
+ */
+ public static <T> Map<String, T> translateDeprecatedConfigs(Map<String, T>
configs,
+ Map<String,
List<String>> aliasGroups) {
+ Set<String> aliasSet = Stream.concat(
+ aliasGroups.keySet().stream(),
+ aliasGroups.values().stream().flatMap(Collection::stream))
+ .collect(Collectors.toSet());
+
+ // pass through all configurations without aliases
+ Map<String, T> newConfigs = configs.entrySet().stream()
+ .filter(e -> !aliasSet.contains(e.getKey()))
+ // filter out null values
+ .filter(e -> Objects.nonNull(e.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ aliasGroups.forEach((target, aliases) -> {
+ List<String> deprecated = aliases.stream()
+ .filter(configs::containsKey)
+ .collect(Collectors.toList());
+
+ if (deprecated.isEmpty()) {
+ // No deprecated key(s) found.
+ if (configs.containsKey(target)) {
+ newConfigs.put(target, configs.get(target));
+ }
+ return;
+ }
+
+ String aliasString = String.join(", ", deprecated);
+
+ if (configs.containsKey(target)) {
+ // Ignore the deprecated key(s) because the actual key was set.
+ log.error(target + " was configured, as well as the deprecated
alias(es) " +
+ aliasString + ". Using the value of " + target);
+ newConfigs.put(target, configs.get(target));
+ } else if (deprecated.size() > 1) {
+ log.error("The configuration keys " + aliasString + " are
deprecated and may be " +
+ "removed in the future. Additionally, this
configuration is ambigous because " +
+ "these configuration keys are all aliases for " +
target + ". Please update " +
+ "your configuration to have only " + target + "
set.");
+ newConfigs.put(target, configs.get(deprecated.get(0)));
+ } else {
+ log.warn("Configuration key " + deprecated.get(0) + " is
deprecated and may be removed " +
+ "in the future. Please update your configuration to
use " + target + " instead.");
+ newConfigs.put(target, configs.get(deprecated.get(0)));
+ }
+ });
+
+ return newConfigs;
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 3285889..15eebdc 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -124,7 +124,7 @@ public class JmxReporterTest {
Map<String, String> configs = new HashMap<>();
- configs.put(JmxReporter.BLACKLIST_CONFIG,
+ configs.put(JmxReporter.EXCLUDE_CONFIG,
JmxReporter.getMBeanName("",
metrics.metricName("pack.bean2.total", "grp2")));
try {
@@ -143,7 +143,7 @@ public class JmxReporterTest {
sensor.record();
- configs.put(JmxReporter.BLACKLIST_CONFIG,
+ configs.put(JmxReporter.EXCLUDE_CONFIG,
JmxReporter.getMBeanName("",
metrics.metricName("pack.bean2.avg", "grp1")));
reporter.reconfigure(configs);
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
new file mode 100644
index 0000000..c77cb0e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class ConfigUtilsTest {
+
+ @Test
+ public void testTranslateDeprecated() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("foo.bar", "baz");
+ config.put("foo.bar.deprecated", "quux");
+ config.put("chicken", "1");
+ config.put("rooster", "2");
+ config.put("hen", "3");
+ config.put("heifer", "moo");
+ config.put("blah", "blah");
+ config.put("unexpected.non.string.object", 42);
+ Map<String, Object> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"},
+ {"chicken", "rooster", "hen"},
+ {"cow", "beef", "heifer", "steer"}
+ });
+ assertEquals("baz", newConfig.get("foo.bar"));
+ assertEquals(null, newConfig.get("foobar.deprecated"));
+ assertEquals("1", newConfig.get("chicken"));
+ assertEquals(null, newConfig.get("rooster"));
+ assertEquals(null, newConfig.get("hen"));
+ assertEquals("moo", newConfig.get("cow"));
+ assertEquals(null, newConfig.get("beef"));
+ assertEquals(null, newConfig.get("heifer"));
+ assertEquals(null, newConfig.get("steer"));
+ assertEquals(null, config.get("cow"));
+ assertEquals("blah", config.get("blah"));
+ assertEquals("blah", newConfig.get("blah"));
+ assertEquals(42, newConfig.get("unexpected.non.string.object"));
+ assertEquals(42, config.get("unexpected.non.string.object"));
+
+ }
+
+ @Test
+ public void testAllowsNewKey() {
+ Map<String, String> config = new HashMap<>();
+ config.put("foo.bar", "baz");
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"},
+ {"chicken", "rooster", "hen"},
+ {"cow", "beef", "heifer", "steer"}
+ });
+ assertNotNull(newConfig);
+ assertEquals("baz", newConfig.get("foo.bar"));
+ assertNull(newConfig.get("foo.bar.deprecated"));
+ }
+
+ @Test
+ public void testAllowDeprecatedNulls() {
+ Map<String, String> config = new HashMap<>();
+ config.put("foo.bar.deprecated", null);
+ config.put("foo.bar", "baz");
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"}
+ });
+ assertNotNull(newConfig);
+ assertEquals("baz", newConfig.get("foo.bar"));
+ assertNull(newConfig.get("foo.bar.deprecated"));
+ }
+
+ @Test
+ public void testAllowNullOverride() {
+ Map<String, String> config = new HashMap<>();
+ config.put("foo.bar.deprecated", "baz");
+ config.put("foo.bar", null);
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"}
+ });
+ assertNotNull(newConfig);
+ assertNull(newConfig.get("foo.bar"));
+ assertNull(newConfig.get("foo.bar.deprecated"));
+ }
+
+ @Test
+ public void testNullMapEntriesWithoutAliasesDoNotThrowNPE() {
+ Map<String, String> config = new HashMap<>();
+ config.put("other", null);
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"}
+ });
+ assertNotNull(newConfig);
+ assertNull(newConfig.get("other"));
+ }
+
+ @Test
+ public void testDuplicateSynonyms() {
+ Map<String, String> config = new HashMap<>();
+ config.put("foo.bar", "baz");
+ config.put("foo.bar.deprecated", "derp");
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated"},
+ {"chicken", "foo.bar.deprecated"}
+ });
+ assertNotNull(newConfig);
+ assertEquals("baz", newConfig.get("foo.bar"));
+ assertEquals("derp", newConfig.get("chicken"));
+ assertNull(newConfig.get("foo.bar.deprecated"));
+ }
+
+ @Test
+ public void testMultipleDeprecations() {
+ Map<String, String> config = new HashMap<>();
+ config.put("foo.bar.deprecated", "derp");
+ config.put("foo.bar.even.more.deprecated", "very old configuration");
+ Map<String, String> newConfig =
ConfigUtils.translateDeprecatedConfigs(config, new String[][]{
+ {"foo.bar", "foo.bar.deprecated", "foo.bar.even.more.deprecated"}
+ });
+ assertNotNull(newConfig);
+ assertEquals("derp", newConfig.get("foo.bar"));
+ assertNull(newConfig.get("foo.bar.deprecated"));
+ assertNull(newConfig.get("foo.bar.even.more.deprecated"));
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 7d8643f..bac15de 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -40,7 +40,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
val overridingProps = new Properties
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
- overridingProps.put(JmxReporter.BLACKLIST_CONFIG,
"kafka.server:type=KafkaServer,name=ClusterId")
+ overridingProps.put(JmxReporter.EXCLUDE_CONFIG,
"kafka.server:type=KafkaServer,name=ClusterId")
def generateConfigs =
TestUtils.createBrokerConfigs(numNodes,
zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@@ -90,7 +90,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
def testUpdateJMXFilter(): Unit = {
// verify previously exposed metrics are removed and existing matching
metrics are added
servers.foreach(server => server.kafkaYammerMetrics.reconfigure(
- Map(JmxReporter.BLACKLIST_CONFIG ->
"kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
+ Map(JmxReporter.EXCLUDE_CONFIG ->
"kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
))
assertFalse(ManagementFactory.getPlatformMBeanServer
.isRegistered(new
ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))