This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 615f1a0bf99 KAFKA-16181: Use incrementalAlterConfigs when updating
broker configs by kafka-configs.sh (#15304)
615f1a0bf99 is described below
commit 615f1a0bf999cd0b5ccf38c09e6afaba89538e9f
Author: dengziming <[email protected]>
AuthorDate: Sun Dec 1 18:32:02 2024 +0800
KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by
kafka-configs.sh (#15304)
This PR implement KIP-1011, kafka-configs.sh now uses
incrementalAlterConfigs API to alter broker configurations instead of the
deprecated alterConfigs API, and it will fall directly if the broker doesn't
support incrementalAlterConfigs.
Reviewers: David Jacot <[email protected]>, OmniaGM
<[email protected]>.
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 77 ++++++-----
.../kafka/admin/ConfigCommandIntegrationTest.java | 147 ++++++++++++++++++---
.../test/java/kafka/admin/ConfigCommandTest.java | 21 ++-
docs/upgrade.html | 4 +
.../kafka/common/test/KafkaClusterTestKit.java | 2 +-
5 files changed, 183 insertions(+), 68 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 993a2abe5c4..fe1fe9faed3 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,22 +18,24 @@
package kafka.admin
import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
-import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo,
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig,
ScramMechanism => PublicScramMechanism}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo,
UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism =>
PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
+
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
System.err.println(e.getMessage)
Exit.exit(1)
+ case e: UnsupportedVersionException =>
+ logger.debug(s"Unsupported API encountered in server when executing
config command with args '${args.mkString(" ")}'")
+ System.err.println(e.getMessage)
+ Exit.exit(1)
+
case t: Throwable =>
logger.debug(s"Error while executing config command with args
'${args.mkString(" ")}'", t)
System.err.println(s"Error while executing config command with args
'${args.mkString(" ")}'")
@@ -161,7 +168,6 @@ object ConfigCommand extends Logging {
}
}
- @nowarn("cat=deprecation")
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+ case ConfigType.BROKER => ConfigResource.Type.BROKER
+ case ConfigType.GROUP => ConfigResource.Type.GROUP
+ }
+ try {
+ alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, configResourceType)
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case _: UnsupportedVersionException =>
+ throw new UnsupportedVersionException(s"The
${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The
API is supported starting from version 2.3.0."
+ + " You may want to use an older version of this tool to
interact with your cluster, or upgrade your brokers to version 2.3.0 or newer
to avoid this error.")
+ case _ => throw e
+ }
+ case e: Throwable => throw e
+ }
case BrokerLoggerConfigType =>
val validLoggers = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
@@ -203,10 +207,10 @@ object ConfigCommand extends Logging {
val configResource = new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterLogLevelEntries = (configsToBeAdded.values.map(new
AlterConfigOp(_, AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k,
AlterConfigOp.OpType.SET))
+ val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
+ val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
+ adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case ConfigType.USER | ConfigType.CLIENT =>
val hasQuotaConfigsToAdd =
configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
@@ -250,13 +254,8 @@ object ConfigCommand extends Logging {
throw new IllegalArgumentException(s"Only connection quota configs
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
- case ConfigType.CLIENT_METRICS =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.CLIENT_METRICS)
-
- case ConfigType.GROUP =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.GROUP)
-
- case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
}
if (entityNameHead.nonEmpty)
@@ -380,9 +379,9 @@ object ConfigCommand extends Logging {
val configResource = new ConfigResource(resourceType, entityNameHead)
val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k,
""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
+ val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k,
AlterConfigOp.OpType.SET))
+ val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
+ val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
}
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 5ecd0337a96..cc4d6a01dcc 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -17,16 +17,25 @@
package kafka.admin;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@@ -57,9 +66,11 @@ import static
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBA
import static
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
@ExtendWith(value = ClusterTestExtensions.class)
public class ConfigCommandIntegrationTest {
@@ -159,11 +170,11 @@ public class ConfigCommandIntegrationTest {
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"),
alterOpts);
// Per-broker config configured at default cluster-level should
fail
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client, Optional.empty(),
+ () -> alterConfigWithAdmin(client, Optional.empty(),
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"),
alterOpts));
deleteAndVerifyConfigValue(client, defaultBrokerId,
singleton("listener.name.internal.ssl.keystore.location"),
false, alterOpts);
- alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+ alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
singletonMap("listener.name.external.ssl.keystore.password", "secret"),
alterOpts);
// Password config update with encoder secret should succeed and
encoded password must be stored in ZK
@@ -175,7 +186,7 @@ public class ConfigCommandIntegrationTest {
// Password config update at default cluster-level should fail
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId), configs, alterOpts));
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId), configs, alterOpts));
}
}
@@ -204,7 +215,7 @@ public class ConfigCommandIntegrationTest {
deleteAndVerifyGroupConfigValue(client, defaultGroupName, configs,
alterOpts);
// Unknown config configured should fail
- assertThrows(ExecutionException.class, () ->
alterConfigWithKraft(client, singletonMap("unknown.config", "20000"),
alterOpts));
+ assertThrows(ExecutionException.class, () ->
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"),
alterOpts));
}
}
@@ -232,7 +243,7 @@ public class ConfigCommandIntegrationTest {
deleteAndVerifyClientMetricsConfigValue(client,
defaultClientMetricsName, configs.keySet(), alterOpts);
// Unknown config configured should fail
- assertThrows(ExecutionException.class, () ->
alterConfigWithKraft(client, singletonMap("unknown.config", "20000"),
alterOpts));
+ assertThrows(ExecutionException.class, () ->
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"),
alterOpts));
}
}
@@ -242,13 +253,13 @@ public class ConfigCommandIntegrationTest {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(AUTO_CREATE_TOPICS_ENABLE_CONFIG,
"false"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
"false"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap("broker.id", "1"), alterOpts));
}
}
@@ -277,7 +288,7 @@ public class ConfigCommandIntegrationTest {
singletonMap(listenerName + "ssl.truststore.type",
"PKCS12"), alterOpts);
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
singletonMap(listenerName + "ssl.truststore.location",
"/temp/test.jks"), alterOpts);
- alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+ alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
singletonMap(listenerName + "ssl.truststore.password",
"password"), alterOpts);
verifyConfigSecretValue(client, Optional.of(defaultBrokerId),
singleton(listenerName + "ssl.truststore.password"));
@@ -290,17 +301,119 @@ public class ConfigCommandIntegrationTest {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ // Must be at greater than 1MB per cleaner thread, set to 2M+2 so that
we can set 2 cleaner threads.
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value =
"2097154"),
+ })
+ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+ try (Admin client = cluster.admin()) {
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threadzz=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+ kafka.utils.TestUtils.waitUntilTrue(
+ () ->
cluster.brokerSocketServers().stream().allMatch(broker ->
broker.config().getInt("log.cleaner.threads") == 2),
+ () -> "Timeout waiting for topic config propagating to
broker",
+ org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+ 100L);
+ }
+ }
+
+ @ClusterTest(
+ // Must be at greater than 1MB per cleaner thread, set to 2M+2 so
that we can set 2 cleaner threads.
+ serverProperties = {@ClusterConfigProperty(key =
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+ // Zk code has been removed, use kraft and mockito to mock this
situation
+ metadataVersion = MetadataVersion.IBP_3_3_IV0
+ )
+ public void testUnsupportedVersionException() {
+ try (Admin client = cluster.admin()) {
+ Admin spyAdmin = Mockito.spy(client);
+
+ AlterConfigsResult mockResult =
AdminClientTestUtils.alterConfigsResult(
+ new ConfigResource(ConfigResource.Type.BROKER, ""), new
UnsupportedVersionException("simulated error"));
+ Mockito.doReturn(mockResult).when(spyAdmin)
+ .incrementalAlterConfigs(any(java.util.Map.class),
any(AlterConfigsOptions.class));
+ assertEquals(
+ "The INCREMENTAL_ALTER_CONFIGS API is not supported by the
cluster. The API is supported starting from version 2.3.0. You may want to use
an older version of this tool to interact with your cluster, or upgrade your
brokers to version 2.3.0 or newer to avoid this error.",
+ assertThrows(UnsupportedVersionException.class, () -> {
+ ConfigCommand.alterConfig(spyAdmin, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList(
+ "--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config",
"log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+ }).getMessage()
+ );
+
Mockito.verify(spyAdmin).incrementalAlterConfigs(any(java.util.Map.class),
any(AlterConfigsOptions.class));
+ }
+ }
+
private void assertNonZeroStatusExit(Stream<String> args, Consumer<String>
checkErrOut) {
AtomicReference<Integer> exitStatus = new AtomicReference<>();
Exit.setExitProcedure((status, __) -> {
@@ -333,7 +446,7 @@ public class ConfigCommandIntegrationTest {
Optional<String> brokerId,
Map<String, String> config,
List<String> alterOpts) throws Exception
{
- alterConfigWithKraft(client, brokerId, config, alterOpts);
+ alterConfigWithAdmin(client, brokerId, config, alterOpts);
verifyConfig(client, brokerId, config);
}
@@ -341,7 +454,7 @@ public class ConfigCommandIntegrationTest {
String groupName,
Map<String, String> config,
List<String> alterOpts) throws
Exception {
- alterConfigWithKraft(client, config, alterOpts);
+ alterConfigWithAdmin(client, config, alterOpts);
verifyGroupConfig(client, groupName, config);
}
@@ -349,11 +462,11 @@ public class ConfigCommandIntegrationTest {
String clientMetricsName,
Map<String, String> config,
List<String> alterOpts)
throws Exception {
- alterConfigWithKraft(client, config, alterOpts);
+ alterConfigWithAdmin(client, config, alterOpts);
verifyClientMetricsConfig(client, clientMetricsName, config);
}
- private void alterConfigWithKraft(Admin client, Optional<String>
resourceName, Map<String, String> config, List<String> alterOpts) {
+ private void alterConfigWithAdmin(Admin client, Optional<String>
resourceName, Map<String, String> config, List<String> alterOpts) {
String configStr = transferConfigMapToString(config);
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
ConfigCommand.ConfigCommandOptions addOpts =
@@ -365,7 +478,7 @@ public class ConfigCommandIntegrationTest {
ConfigCommand.alterConfig(client, addOpts);
}
- private void alterConfigWithKraft(Admin client, Map<String, String>
config, List<String> alterOpts) {
+ private void alterConfigWithAdmin(Admin client, Map<String, String>
config, List<String> alterOpts) {
String configStr = transferConfigMapToString(config);
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
ConfigCommand.ConfigCommandOptions addOpts =
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 74fe3271f23..308777417be 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -1017,15 +1017,14 @@ public class ConfigCommandTest {
return describeResult;
}
- @SuppressWarnings("deprecation")
@Override
- public synchronized AlterConfigsResult
alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
+ public synchronized AlterConfigsResult
incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
AlterConfigsOptions options) {
assertEquals(1, configs.size());
- Map.Entry<ConfigResource, Config> entry =
configs.entrySet().iterator().next();
+ Map.Entry<ConfigResource, Collection<AlterConfigOp>> entry =
configs.entrySet().iterator().next();
ConfigResource res = entry.getKey();
- Config config = entry.getValue();
+ Collection<AlterConfigOp> config = entry.getValue();
assertEquals(ConfigResource.Type.BROKER, res.type());
- config.entries().forEach(e -> brokerConfigs.put(e.name(),
e.value()));
+ config.forEach(e -> brokerConfigs.put(e.configEntry().name(),
e.configEntry().value()));
return alterResult;
}
};
@@ -1115,9 +1114,9 @@ public class ConfigCommandTest {
assertEquals(3, alterConfigOps.size());
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
- new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner",
"DEBUG"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new
ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
- new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi",
""), AlterConfigOp.OpType.DELETE)
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi",
""), AlterConfigOp.OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner",
"DEBUG"), AlterConfigOp.OpType.SET)
);
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
Iterator<AlterConfigOp> alterConfigOpsIter =
alterConfigOps.iterator();
@@ -1248,9 +1247,9 @@ public class ConfigCommandTest {
assertEquals(3, alterConfigOps.size());
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
+ new AlterConfigOp(new ConfigEntry("interval.ms", ""),
AlterConfigOp.OpType.DELETE),
new AlterConfigOp(new ConfigEntry("match",
"client_software_name=kafka.python,client_software_version=1\\.2\\..*"),
AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("metrics",
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
- new AlterConfigOp(new ConfigEntry("interval.ms", ""),
AlterConfigOp.OpType.DELETE)
+ new AlterConfigOp(new ConfigEntry("metrics",
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET)
);
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
Iterator<AlterConfigOp> alterConfigOpsIter =
alterConfigOps.iterator();
@@ -1358,8 +1357,8 @@ public class ConfigCommandTest {
assertEquals(2, alterConfigOps.size());
List<AlterConfigOp> expectedConfigOps = Arrays.asList(
- new AlterConfigOp(new
ConfigEntry("consumer.heartbeat.interval.ms", "6000"),
AlterConfigOp.OpType.SET),
- new AlterConfigOp(new
ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE)
+ new AlterConfigOp(new
ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE),
+ new AlterConfigOp(new
ConfigEntry("consumer.heartbeat.interval.ms", "6000"), AlterConfigOp.OpType.SET)
);
assertEquals(expectedConfigOps.size(), alterConfigOps.size());
Iterator<AlterConfigOp> alterConfigOpsIter =
alterConfigOps.iterator();
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5a7a13f71fd..cc7fcf9b2dd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -126,6 +126,10 @@
<li>The <code>--broker-list</code> option was removed
from the <code>kafka-verifiable-consumer</code> command line tool.
Please use <code>--bootstrap-server</code> instead.
</li>
+ <li>kafka-configs.sh now uses incrementalAlterConfigs
API to alter broker configurations instead of the deprecated alterConfigs API,
+ and it will fall directly if the broker doesn't
support incrementalAlterConfigs API, which means the broker version is prior to
2.3.x.
+ See <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1011%3A+Use+incrementalAlterConfigs+when+updating+broker+configs+by+kafka-configs.sh">KIP-1011</a>
for more details.
+ </li>
</ul>
</li>
<li><b>Connect</b>
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index ee1bf34a5d1..24df147b21e 100644
---
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -174,7 +174,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
// reduce log cleaner offset map memory usage
- props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
"2097152");
+
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
// Add associated broker node property overrides
if (brokerNode != null) {