This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 615f1a0bf99 KAFKA-16181: Use incrementalAlterConfigs when updating 
broker configs by kafka-configs.sh (#15304)
615f1a0bf99 is described below

commit 615f1a0bf999cd0b5ccf38c09e6afaba89538e9f
Author: dengziming <[email protected]>
AuthorDate: Sun Dec 1 18:32:02 2024 +0800

    KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by 
kafka-configs.sh (#15304)
    
    This PR implement KIP-1011, kafka-configs.sh now uses 
incrementalAlterConfigs API to alter broker configurations instead of the 
deprecated alterConfigs API, and it will fall directly if the broker doesn't 
support incrementalAlterConfigs.
    
    Reviewers: David Jacot <[email protected]>, OmniaGM 
<[email protected]>.
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  77 ++++++-----
 .../kafka/admin/ConfigCommandIntegrationTest.java  | 147 ++++++++++++++++++---
 .../test/java/kafka/admin/ConfigCommandTest.java   |  21 ++-
 docs/upgrade.html                                  |   4 +
 .../kafka/common/test/KafkaClusterTestKit.java     |   2 +-
 5 files changed, 183 insertions(+), 68 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 993a2abe5c4..fe1fe9faed3 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,22 +18,24 @@
 package kafka.admin
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
 import java.util.{Collections, Properties}
 import joptsimple._
 import kafka.server.DynamicConfig
 import kafka.utils.Implicits._
 import kafka.utils.Logging
-import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, 
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, 
ScramMechanism => PublicScramMechanism}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, 
UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => 
PublicScramMechanism}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
+
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
         System.err.println(e.getMessage)
         Exit.exit(1)
 
+      case e: UnsupportedVersionException =>
+        logger.debug(s"Unsupported API encountered in server when executing 
config command with args '${args.mkString(" ")}'")
+        System.err.println(e.getMessage)
+        Exit.exit(1)
+
       case t: Throwable =>
         logger.debug(s"Error while executing config command with args 
'${args.mkString(" ")}'", t)
         System.err.println(s"Error while executing config command with args 
'${args.mkString(" ")}'")
@@ -161,7 +168,6 @@ object ConfigCommand extends Logging {
     }
   }
 
-  @nowarn("cat=deprecation")
   def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        try {
+          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case _: UnsupportedVersionException =>
+                throw new UnsupportedVersionException(s"The 
${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The 
API is supported starting from version 2.3.0."
+                + " You may want to use an older version of this tool to 
interact with your cluster, or upgrade your brokers to version 2.3.0 or newer 
to avoid this error.")
+              case _ => throw e
+            }
+          case e: Throwable => throw e
+        }
 
       case BrokerLoggerConfigType =>
         val validLoggers = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
@@ -203,10 +207,10 @@ object ConfigCommand extends Logging {
 
         val configResource = new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
         val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+        val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, 
AlterConfigOp.OpType.SET))
+        val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
+        val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
+        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
       case ConfigType.USER | ConfigType.CLIENT =>
         val hasQuotaConfigsToAdd = 
configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
@@ -250,13 +254,8 @@ object ConfigCommand extends Logging {
           throw new IllegalArgumentException(s"Only connection quota configs 
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
         alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
 
-      case ConfigType.CLIENT_METRICS =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.CLIENT_METRICS)
-
-      case ConfigType.GROUP =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.GROUP)
-
-      case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
     }
 
     if (entityNameHead.nonEmpty)
@@ -380,9 +379,9 @@ object ConfigCommand extends Logging {
 
     val configResource = new ConfigResource(resourceType, entityNameHead)
     val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-    val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-      ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
-      ).asJavaCollection
+    val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, 
AlterConfigOp.OpType.SET))
+    val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
+    val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
     adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
   }
 
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 5ecd0337a96..cc4d6a01dcc 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -17,16 +17,25 @@
 package kafka.admin;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestExtensions;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
@@ -57,9 +66,11 @@ import static 
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBA
 import static 
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
 import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 
 @ExtendWith(value = ClusterTestExtensions.class)
 public class ConfigCommandIntegrationTest {
@@ -159,11 +170,11 @@ public class ConfigCommandIntegrationTest {
                     
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), 
alterOpts);
             // Per-broker config configured at default cluster-level should 
fail
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, Optional.empty(),
+                    () -> alterConfigWithAdmin(client, Optional.empty(),
                             
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"), 
alterOpts));
             deleteAndVerifyConfigValue(client, defaultBrokerId,
                     singleton("listener.name.internal.ssl.keystore.location"), 
false, alterOpts);
-            alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+            alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
                     
singletonMap("listener.name.external.ssl.keystore.password", "secret"), 
alterOpts);
 
             // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
@@ -175,7 +186,7 @@ public class ConfigCommandIntegrationTest {
 
             // Password config update at default cluster-level should fail
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId), configs, alterOpts));
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId), configs, alterOpts));
         }
     }
 
@@ -204,7 +215,7 @@ public class ConfigCommandIntegrationTest {
             deleteAndVerifyGroupConfigValue(client, defaultGroupName, configs, 
alterOpts);
 
             // Unknown config configured should fail
-            assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, singletonMap("unknown.config", "20000"), 
alterOpts));
+            assertThrows(ExecutionException.class, () -> 
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"), 
alterOpts));
         }
     }
 
@@ -232,7 +243,7 @@ public class ConfigCommandIntegrationTest {
             deleteAndVerifyClientMetricsConfigValue(client, 
defaultClientMetricsName, configs.keySet(), alterOpts);
 
             // Unknown config configured should fail
-            assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, singletonMap("unknown.config", "20000"), 
alterOpts));
+            assertThrows(ExecutionException.class, () -> 
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"), 
alterOpts));
         }
     }
 
@@ -242,13 +253,13 @@ public class ConfigCommandIntegrationTest {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
"false"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
"false"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap("broker.id", "1"), alterOpts));
         }
     }
@@ -277,7 +288,7 @@ public class ConfigCommandIntegrationTest {
                     singletonMap(listenerName + "ssl.truststore.type", 
"PKCS12"), alterOpts);
             alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
                     singletonMap(listenerName + "ssl.truststore.location", 
"/temp/test.jks"), alterOpts);
-            alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+            alterConfigWithAdmin(client, Optional.of(defaultBrokerId),
                     singletonMap(listenerName + "ssl.truststore.password", 
"password"), alterOpts);
             verifyConfigSecretValue(client, Optional.of(defaultBrokerId),
                     singleton(listenerName + "ssl.truststore.password"));
@@ -290,17 +301,119 @@ public class ConfigCommandIntegrationTest {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/temp/test.jks"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"password"), alterOpts));
         }
     }
 
+    @ClusterTest
+    public void testUpdateInvalidBrokerConfigs() {
+        updateAndCheckInvalidBrokerConfig(Optional.empty());
+        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+    }
+
+    private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
+        List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
+        try (Admin client = cluster.admin()) {
+            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+
+            Stream<String> describeCommand = Stream.concat(
+                    Stream.concat(
+                            Stream.of("--bootstrap-server", 
cluster.bootstrapServers()),
+                            Stream.of(entityOp(brokerIdOrDefault).toArray(new 
String[0]))),
+                    Stream.of("--entity-type", "brokers", "--describe"));
+            String describeResult = captureStandardStream(false, 
run(describeCommand));
+
+            // We will treat unknown config as sensitive
+            assertTrue(describeResult.contains("sensitive=true"), 
describeResult);
+            // Sensitive config will not return
+            assertTrue(describeResult.contains("invalid=null"), 
describeResult);
+        }
+    }
+
+    @ClusterTest
+    public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
+        List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+        try (Admin client = cluster.admin()) {
+            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            assertInstanceOf(
+                    InvalidConfigurationException.class,
+                    assertThrows(
+                            ExecutionException.class,
+                            () -> ConfigCommand.alterConfig(
+                                    client,
+                                    new ConfigCommand.ConfigCommandOptions(
+                                            toArray(alterOpts,
+                                                    asList("--add-config", 
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+                    ).getCause()
+            );
+        }
+    }
+
+    // Test case from KAFKA-13788
+    @ClusterTest(serverProperties = {
+        // Must be at greater than 1MB per cleaner thread, set to 2M+2 so that 
we can set 2 cleaner threads.
+        @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = 
"2097154"),
+    })
+    public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+        try (Admin client = cluster.admin()) {
+            ConfigCommand.alterConfig(client, new 
ConfigCommand.ConfigCommandOptions(
+                    toArray(asList("--bootstrap-server", 
cluster.bootstrapServers(),
+                            "--alter",
+                            "--add-config", "log.cleaner.threadzz=2",
+                            "--entity-type", "brokers",
+                            "--entity-default"))));
+
+            ConfigCommand.alterConfig(client, new 
ConfigCommand.ConfigCommandOptions(
+                    toArray(asList("--bootstrap-server", 
cluster.bootstrapServers(),
+                            "--alter",
+                            "--add-config", "log.cleaner.threads=2",
+                            "--entity-type", "brokers",
+                            "--entity-default"))));
+            kafka.utils.TestUtils.waitUntilTrue(
+                    () -> 
cluster.brokerSocketServers().stream().allMatch(broker -> 
broker.config().getInt("log.cleaner.threads") == 2),
+                    () -> "Timeout waiting for topic config propagating to 
broker",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+        }
+    }
+
+    @ClusterTest(
+         // Must be at greater than 1MB per cleaner thread, set to 2M+2 so 
that we can set 2 cleaner threads.
+         serverProperties = {@ClusterConfigProperty(key = 
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+         // Zk code has been removed, use kraft and mockito to mock this 
situation
+         metadataVersion = MetadataVersion.IBP_3_3_IV0
+    )
+    public void testUnsupportedVersionException() {
+        try (Admin client = cluster.admin()) {
+            Admin spyAdmin = Mockito.spy(client);
+
+            AlterConfigsResult mockResult = 
AdminClientTestUtils.alterConfigsResult(
+                    new ConfigResource(ConfigResource.Type.BROKER, ""), new 
UnsupportedVersionException("simulated error"));
+            Mockito.doReturn(mockResult).when(spyAdmin)
+                    .incrementalAlterConfigs(any(java.util.Map.class), 
any(AlterConfigsOptions.class));
+            assertEquals(
+                    "The INCREMENTAL_ALTER_CONFIGS API is not supported by the 
cluster. The API is supported starting from version 2.3.0. You may want to use 
an older version of this tool to interact with your cluster, or upgrade your 
brokers to version 2.3.0 or newer to avoid this error.",
+                    assertThrows(UnsupportedVersionException.class, () -> {
+                        ConfigCommand.alterConfig(spyAdmin, new 
ConfigCommand.ConfigCommandOptions(
+                                toArray(asList(
+                                        "--bootstrap-server", 
cluster.bootstrapServers(),
+                                        "--alter",
+                                        "--add-config", 
"log.cleaner.threads=2",
+                                        "--entity-type", "brokers",
+                                        "--entity-default"))));
+                    }).getMessage()
+            );
+            
Mockito.verify(spyAdmin).incrementalAlterConfigs(any(java.util.Map.class), 
any(AlterConfigsOptions.class));
+        }
+    }
+
     private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> 
checkErrOut) {
         AtomicReference<Integer> exitStatus = new AtomicReference<>();
         Exit.setExitProcedure((status, __) -> {
@@ -333,7 +446,7 @@ public class ConfigCommandIntegrationTest {
                                       Optional<String> brokerId,
                                       Map<String, String> config,
                                       List<String> alterOpts) throws Exception 
{
-        alterConfigWithKraft(client, brokerId, config, alterOpts);
+        alterConfigWithAdmin(client, brokerId, config, alterOpts);
         verifyConfig(client, brokerId, config);
     }
 
@@ -341,7 +454,7 @@ public class ConfigCommandIntegrationTest {
                                            String groupName,
                                            Map<String, String> config,
                                            List<String> alterOpts) throws 
Exception {
-        alterConfigWithKraft(client, config, alterOpts);
+        alterConfigWithAdmin(client, config, alterOpts);
         verifyGroupConfig(client, groupName, config);
     }
 
@@ -349,11 +462,11 @@ public class ConfigCommandIntegrationTest {
                                                    String clientMetricsName,
                                                    Map<String, String> config,
                                                    List<String> alterOpts) 
throws Exception {
-        alterConfigWithKraft(client, config, alterOpts);
+        alterConfigWithAdmin(client, config, alterOpts);
         verifyClientMetricsConfig(client, clientMetricsName, config);
     }
 
-    private void alterConfigWithKraft(Admin client, Optional<String> 
resourceName, Map<String, String> config, List<String> alterOpts) {
+    private void alterConfigWithAdmin(Admin client, Optional<String> 
resourceName, Map<String, String> config, List<String> alterOpts) {
         String configStr = transferConfigMapToString(config);
         List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
         ConfigCommand.ConfigCommandOptions addOpts =
@@ -365,7 +478,7 @@ public class ConfigCommandIntegrationTest {
         ConfigCommand.alterConfig(client, addOpts);
     }
 
-    private void alterConfigWithKraft(Admin client, Map<String, String> 
config, List<String> alterOpts) {
+    private void alterConfigWithAdmin(Admin client, Map<String, String> 
config, List<String> alterOpts) {
         String configStr = transferConfigMapToString(config);
         List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
         ConfigCommand.ConfigCommandOptions addOpts =
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 74fe3271f23..308777417be 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -1017,15 +1017,14 @@ public class ConfigCommandTest {
                 return describeResult;
             }
 
-            @SuppressWarnings("deprecation")
             @Override
-            public synchronized AlterConfigsResult 
alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
+            public synchronized AlterConfigsResult 
incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, 
AlterConfigsOptions options) {
                 assertEquals(1, configs.size());
-                Map.Entry<ConfigResource, Config> entry = 
configs.entrySet().iterator().next();
+                Map.Entry<ConfigResource, Collection<AlterConfigOp>> entry = 
configs.entrySet().iterator().next();
                 ConfigResource res = entry.getKey();
-                Config config = entry.getValue();
+                Collection<AlterConfigOp> config = entry.getValue();
                 assertEquals(ConfigResource.Type.BROKER, res.type());
-                config.entries().forEach(e -> brokerConfigs.put(e.name(), 
e.value()));
+                config.forEach(e -> brokerConfigs.put(e.configEntry().name(), 
e.configEntry().value()));
                 return alterResult;
             }
         };
@@ -1115,9 +1114,9 @@ public class ConfigCommandTest {
                 assertEquals(3, alterConfigOps.size());
 
                 List<AlterConfigOp> expectedConfigOps = Arrays.asList(
-                    new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", 
"DEBUG"), AlterConfigOp.OpType.SET),
                     new AlterConfigOp(new 
ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
-                    new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", 
""), AlterConfigOp.OpType.DELETE)
+                    new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", 
""), AlterConfigOp.OpType.DELETE),
+                    new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", 
"DEBUG"), AlterConfigOp.OpType.SET)
                 );
                 assertEquals(expectedConfigOps.size(), alterConfigOps.size());
                 Iterator<AlterConfigOp> alterConfigOpsIter = 
alterConfigOps.iterator();
@@ -1248,9 +1247,9 @@ public class ConfigCommandTest {
                 assertEquals(3, alterConfigOps.size());
 
                 List<AlterConfigOp> expectedConfigOps = Arrays.asList(
+                    new AlterConfigOp(new ConfigEntry("interval.ms", ""), 
AlterConfigOp.OpType.DELETE),
                     new AlterConfigOp(new ConfigEntry("match", 
"client_software_name=kafka.python,client_software_version=1\\.2\\..*"), 
AlterConfigOp.OpType.SET),
-                    new AlterConfigOp(new ConfigEntry("metrics", 
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
-                    new AlterConfigOp(new ConfigEntry("interval.ms", ""), 
AlterConfigOp.OpType.DELETE)
+                    new AlterConfigOp(new ConfigEntry("metrics", 
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET)
                 );
                 assertEquals(expectedConfigOps.size(), alterConfigOps.size());
                 Iterator<AlterConfigOp> alterConfigOpsIter = 
alterConfigOps.iterator();
@@ -1358,8 +1357,8 @@ public class ConfigCommandTest {
                 assertEquals(2, alterConfigOps.size());
 
                 List<AlterConfigOp> expectedConfigOps = Arrays.asList(
-                    new AlterConfigOp(new 
ConfigEntry("consumer.heartbeat.interval.ms", "6000"), 
AlterConfigOp.OpType.SET),
-                    new AlterConfigOp(new 
ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE)
+                    new AlterConfigOp(new 
ConfigEntry("consumer.session.timeout.ms", ""), AlterConfigOp.OpType.DELETE),
+                    new AlterConfigOp(new 
ConfigEntry("consumer.heartbeat.interval.ms", "6000"), AlterConfigOp.OpType.SET)
                 );
                 assertEquals(expectedConfigOps.size(), alterConfigOps.size());
                 Iterator<AlterConfigOp> alterConfigOpsIter = 
alterConfigOps.iterator();
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5a7a13f71fd..cc7fcf9b2dd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -126,6 +126,10 @@
                         <li>The <code>--broker-list</code> option was removed 
from the <code>kafka-verifiable-consumer</code> command line tool.
                             Please use <code>--bootstrap-server</code> instead.
                         </li>
+                        <li>kafka-configs.sh now uses incrementalAlterConfigs 
API to alter broker configurations instead of the deprecated alterConfigs API,
+                            and it will fall directly if the broker doesn't 
support incrementalAlterConfigs API, which means the broker version is prior to 
2.3.x.
+                            See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1011%3A+Use+incrementalAlterConfigs+when+updating+broker+configs+by+kafka-configs.sh";>KIP-1011</a>
 for more details.
+                        </li>
                     </ul>
                 </li>
                 <li><b>Connect</b>
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index ee1bf34a5d1..24df147b21e 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -174,7 +174,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
 
             // reduce log cleaner offset map memory usage
-            props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097152");
+            
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
 
             // Add associated broker node property overrides
             if (brokerNode != null) {

Reply via email to