This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fd951a9c0a KAFKA-17610 Drop alterConfigs (#18002)
6fd951a9c0a is described below

commit 6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Dec 2 23:26:06 2024 +0800

    KAFKA-17610 Drop alterConfigs (#18002)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  35 ------
 .../kafka/clients/admin/AlterConfigsOptions.java   |   2 +-
 .../kafka/clients/admin/AlterConfigsResult.java    |   2 +-
 .../kafka/clients/admin/ForwardingAdmin.java       |   6 -
 .../kafka/clients/admin/KafkaAdminClient.java      |  68 -----------
 .../kafka/clients/admin/MockAdminClient.java       |   6 -
 .../FakeForwardingAdminWithLocalMetadata.java      |  21 ----
 .../test/java/kafka/admin/ConfigCommandTest.java   |   5 -
 .../AdminClientWithPoliciesIntegrationTest.scala   |  61 +++++----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 136 ++++++---------------
 .../server/DynamicBrokerReconfigurationTest.scala  |  37 +++---
 .../kafka/server/KRaftClusterTest.scala            |  78 ------------
 .../kafka/server/DynamicConfigChangeTest.scala     |  18 +--
 docs/upgrade.html                                  |   8 ++
 .../TestingMetricsInterceptingAdminClient.java     |   7 --
 15 files changed, 97 insertions(+), 393 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 6db43732872..7f47bfbd8ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -486,41 +486,6 @@ public interface Admin extends AutoCloseable {
      */
     DescribeConfigsResult describeConfigs(Collection<ConfigResource> 
resources, DescribeConfigsOptions options);
 
-    /**
-     * Update the configuration for the specified resources with the default 
options.
-     * <p>
-     * This is a convenience method for {@link #alterConfigs(Map, 
AlterConfigsOptions)} with default options.
-     * See the overload for more details.
-     * <p>
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param configs The resources with their configs (topic is the only 
resource type with configs that can
-     *                be updated currently)
-     * @return The AlterConfigsResult
-     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
-     */
-    @Deprecated
-    default AlterConfigsResult alterConfigs(Map<ConfigResource, Config> 
configs) {
-        return alterConfigs(configs, new AlterConfigsOptions());
-    }
-
-    /**
-     * Update the configuration for the specified resources with the default 
options.
-     * <p>
-     * Updates are not transactional so they may succeed for some resources 
while fail for others. The configs for
-     * a particular resource are updated atomically.
-     * <p>
-     * This operation is supported by brokers with version 0.11.0.0 or higher.
-     *
-     * @param configs The resources with their configs (topic is the only 
resource type with configs that can
-     *                be updated currently)
-     * @param options The options to use when describing configs
-     * @return The AlterConfigsResult
-     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, 
AlterConfigsOptions)}.
-     */
-    @Deprecated
-    AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, 
AlterConfigsOptions options);
-
     /**
      * Incrementally updates the configuration for the specified resources 
with default options.
      * <p>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 198a4eab62a..b2e85b9e813 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Map;
 
 /**
- * Options for {@link Admin#incrementalAlterConfigs(Map)} and {@link 
Admin#alterConfigs(Map)}.
+ * Options for {@link Admin#incrementalAlterConfigs(Map)}.
  *
  * The API of this class is evolving, see {@link Admin} for details.
  */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
index 29056ce2940..007b4422b36 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import java.util.Map;
 
 /**
- * The result of the {@link Admin#alterConfigs(Map)} call.
+ * The result of the {@link Admin#incrementalAlterConfigs(Map, 
AlterConfigsOptions)} call.
  *
  * The API of this class is evolving, see {@link Admin} for details.
  */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 932a1f2b74d..88a693934e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -103,12 +103,6 @@ public class ForwardingAdmin implements Admin {
         return delegate.describeConfigs(resources, options);
     }
 
-    @Deprecated
-    @Override
-    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> 
configs, AlterConfigsOptions options) {
-        return delegate.alterConfigs(configs, options);
-    }
-
     @Override
     public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, 
Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
         return delegate.incrementalAlterConfigs(configs, options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 4fdc8a45bc9..e6986c8a378 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -182,8 +182,6 @@ import org.apache.kafka.common.requests.AddRaftVoterRequest;
 import org.apache.kafka.common.requests.AddRaftVoterResponse;
 import org.apache.kafka.common.requests.AlterClientQuotasRequest;
 import org.apache.kafka.common.requests.AlterClientQuotasResponse;
-import org.apache.kafka.common.requests.AlterConfigsRequest;
-import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
@@ -2868,72 +2866,6 @@ public class KafkaAdminClient extends AdminClient {
         return configSource;
     }
 
-    @Override
-    @Deprecated
-    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> 
configs, final AlterConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new 
HashMap<>();
-        // We must make a separate AlterConfigs request for every BROKER 
resource we want to alter
-        // and send the request to that specific node. Other resources are 
grouped together into
-        // a single request that may be sent to any node.
-        final Collection<ConfigResource> unifiedRequestResources = new 
ArrayList<>();
-
-        for (ConfigResource resource : configs.keySet()) {
-            Integer node = nodeFor(resource);
-            if (node != null) {
-                NodeProvider nodeProvider = new 
ConstantBrokerOrActiveKController(node);
-                allFutures.putAll(alterConfigs(configs, options, 
Collections.singleton(resource), nodeProvider));
-            } else
-                unifiedRequestResources.add(resource);
-        }
-        if (!unifiedRequestResources.isEmpty())
-          allFutures.putAll(alterConfigs(configs, options, 
unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
-        return new AlterConfigsResult(new HashMap<>(allFutures));
-    }
-
-    private Map<ConfigResource, KafkaFutureImpl<Void>> 
alterConfigs(Map<ConfigResource, Config> configs,
-                                                                    final 
AlterConfigsOptions options,
-                                                                    
Collection<ConfigResource> resources,
-                                                                    
NodeProvider nodeProvider) {
-        final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new 
HashMap<>();
-        final Map<ConfigResource, AlterConfigsRequest.Config> requestMap = new 
HashMap<>(resources.size());
-        for (ConfigResource resource : resources) {
-            List<AlterConfigsRequest.ConfigEntry> configEntries = new 
ArrayList<>();
-            for (ConfigEntry configEntry: configs.get(resource).entries())
-                configEntries.add(new 
AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
-            requestMap.put(resource, new 
AlterConfigsRequest.Config(configEntries));
-            futures.put(resource, new KafkaFutureImpl<>());
-        }
-
-        final long now = time.milliseconds();
-        runnable.call(new Call("alterConfigs", calcDeadlineMs(now, 
options.timeoutMs()), nodeProvider) {
-
-            @Override
-            public AlterConfigsRequest.Builder createRequest(int timeoutMs) {
-                return new AlterConfigsRequest.Builder(requestMap, 
options.shouldValidateOnly());
-            }
-
-            @Override
-            public void handleResponse(AbstractResponse abstractResponse) {
-                AlterConfigsResponse response = (AlterConfigsResponse) 
abstractResponse;
-                for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : 
futures.entrySet()) {
-                    KafkaFutureImpl<Void> future = entry.getValue();
-                    ApiException exception = 
response.errors().get(entry.getKey()).exception();
-                    if (exception != null) {
-                        future.completeExceptionally(exception);
-                    } else {
-                        future.complete(null);
-                    }
-                }
-            }
-
-            @Override
-            void handleFailure(Throwable throwable) {
-                completeAllExceptionally(futures.values(), throwable);
-            }
-        }, now);
-        return futures;
-    }
-
     @Override
     public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, 
Collection<AlterConfigOp>> configs,
                                                       final 
AlterConfigsOptions options) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 1ec9d6b5433..3be5dc7b3e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -871,12 +871,6 @@ public class MockAdminClient extends AdminClient {
         return new Config(configEntries);
     }
 
-    @Override
-    @Deprecated
-    public synchronized AlterConfigsResult alterConfigs(Map<ConfigResource, 
Config> configs, AlterConfigsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
-    }
-
     @Override
     public synchronized AlterConfigsResult incrementalAlterConfigs(
             Map<ConfigResource, Collection<AlterConfigOp>> configs,
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
index 1f2f56166a2..406cc280d59 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
@@ -17,9 +17,6 @@
 
 package org.apache.kafka.connect.mirror.clients.admin;
 
-import org.apache.kafka.clients.admin.AlterConfigsOptions;
-import org.apache.kafka.clients.admin.AlterConfigsResult;
-import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.CreateAclsOptions;
 import org.apache.kafka.clients.admin.CreateAclsResult;
 import org.apache.kafka.clients.admin.CreatePartitionsOptions;
@@ -30,7 +27,6 @@ import org.apache.kafka.clients.admin.ForwardingAdmin;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.TopicExistsException;
 
 import org.slf4j.Logger;
@@ -79,23 +75,6 @@ public class FakeForwardingAdminWithLocalMetadata extends 
ForwardingAdmin {
         return createPartitionsResult;
     }
 
-    @Deprecated
-    @Override
-    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> 
configs, AlterConfigsOptions options) {
-        AlterConfigsResult alterConfigsResult = super.alterConfigs(configs, 
options);
-        configs.forEach((configResource, newConfigs) -> 
alterConfigsResult.values().get(configResource).whenComplete((ignored, error) 
-> {
-            if (error == null) {
-                if (configResource.type() == ConfigResource.Type.TOPIC) {
-                    
FakeLocalMetadataStore.updateTopicConfig(configResource.name(), newConfigs);
-                }
-            } else {
-                log.error("Unable to intercept admin client operation", error);
-            }
-        }));
-        return alterConfigsResult;
-    }
-
-
     @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
         CreateAclsResult aclsResult = super.createAcls(acls, options);
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 308777417be..70b1faee43b 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -1459,11 +1459,6 @@ public class ConfigCommandTest {
             return mock(AlterConfigsResult.class);
         }
 
-        @Override
-        public synchronized AlterConfigsResult 
alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
-            return mock(AlterConfigsResult.class);
-        }
-
         @Override
         public DescribeClientQuotasResult 
describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions 
options) {
             return mock(DescribeClientQuotasResult.class);
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 6f902b2db3b..562fa12d036 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -14,13 +14,13 @@
 package kafka.api
 
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, SslConfigs, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, 
TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -88,15 +87,17 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     val topic1 = "describe-alter-configs-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
     val topicConfig1 = new Properties
-    topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
-    topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+    val maxMessageBytes = "500000"
+    val retentionMs = "60000000"
+    topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
maxMessageBytes)
+    topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
     createTopic(topic1, 1, 1, topicConfig1)
 
     val topic2 = "describe-alter-configs-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
     createTopic(topic2)
 
-    PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, 
topicResource1, topicResource2)
+    PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, 
topicResource1, topicResource2, maxMessageBytes, retentionMs)
   }
 
   @ParameterizedTest
@@ -106,7 +107,6 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
   }
 
-  @nowarn("cat=deprecation")
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
@@ -127,30 +127,32 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
 
     // Set a mutable broker config
     val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
brokers.head.config.brokerId.toString)
-    val brokerConfigs = Seq(new 
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000")).asJava
-    val alterResult1 = client.alterConfigs(Map(brokerResource -> new 
Config(brokerConfigs)).asJava)
-    alterResult1.all.get
+    var alterResult = 
client.incrementalAlterConfigs(Collections.singletonMap(brokerResource,
+      util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))))
+    alterResult.all.get
     assertEquals(Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG), 
validationsForResource(brokerResource).head.configs().keySet().asScala)
     validations.clear()
 
-    val topicConfigEntries1 = Seq(
-      new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"),
-      new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") // policy 
doesn't allow this
-    ).asJava
+    val alterConfigs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    alterConfigs.put(topicResource1, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
+    ))
 
-    var topicConfigEntries2 = Seq(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8")).asJava
+    alterConfigs.put(topicResource2, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET),
+    ))
 
-    val topicConfigEntries3 = Seq(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1")).asJava
+    alterConfigs.put(topicResource3, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET),
+    ))
 
-    val brokerConfigEntries = Seq(new 
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313")).asJava
+    alterConfigs.put(brokerResource, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET),
+    ))
 
     // Alter configs: second is valid, the others are invalid
-    var alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      topicResource3 -> new Config(topicConfigEntries3),
-      brokerResource -> new Config(brokerConfigEntries)
-    ).asJava)
+    alterResult = client.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
@@ -175,14 +177,11 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
 
     // Alter configs with validateOnly = true: only second is valid
-    topicConfigEntries2 = Seq(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7")).asJava
-
-    alterResult = client.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      brokerResource -> new Config(brokerConfigEntries),
-      topicResource3 -> new Config(topicConfigEntries3)
-    ).asJava, new AlterConfigsOptions().validateOnly(true))
+    alterConfigs.put(topicResource2, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET),
+    ))
+
+    alterResult = client.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0c840e4bf64..2eb773c0614 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,6 +33,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.HostResolver
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
@@ -53,7 +54,7 @@ import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs}
+import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs, ZkConfigs}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
 import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
 import org.apache.log4j.PropertyConfigurator
@@ -64,7 +65,6 @@ import org.junit.jupiter.params.provider.{MethodSource, 
ValueSource}
 import org.slf4j.LoggerFactory
 
 import java.util.AbstractMap.SimpleImmutableEntry
-import scala.annotation.nowarn
 import scala.collection.Seq
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
@@ -122,29 +122,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     } finally brokenClient.close(time.Duration.ZERO)
   }
 
-  @ParameterizedTest
-  @Timeout(30)
-  @ValueSource(strings = Array("kraft"))
-  def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
-    client = createAdminClient
-    val config = createConfig
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
-    val brokenClient = Admin.create(config)
-
-    try {
-      val alterLogLevelsEntries = Seq(
-        new ConfigEntry("kafka.server.ControllerServer", 
LogLevelConfig.INFO_LOG_LEVEL)
-      ).asJavaCollection
-
-      val exception = assertThrows(classOf[ExecutionException], () => {
-        brokenClient.alterConfigs(
-        Map(brokerLoggerConfigResource -> new 
Config(alterLogLevelsEntries)).asJava,
-          new AlterConfigsOptions().timeoutMs(0)).all().get()
-      })
-      assertInstanceOf(classOf[TimeoutException], exception.getCause)
-    } finally brokenClient.close(time.Duration.ZERO)
-  }
-
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit 
= {
@@ -993,8 +970,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val topic1 = "describe-alter-configs-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
     val topicConfig1 = new Properties
-    topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
-    topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+    val maxMessageBytes = "500000"
+    val retentionMs = "60000000"
+    topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
maxMessageBytes)
+    topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
     createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
 
     val topic2 = "describe-alter-configs-topic-2"
@@ -1064,7 +1043,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(brokers(2).config.logCleanerThreads.toString,
       
configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)
 
-    checkValidAlterConfigs(client, this, topicResource1, topicResource2)
+    checkValidAlterConfigs(client, this, topicResource1, topicResource2, 
maxMessageBytes, retentionMs)
   }
 
   @ParameterizedTest
@@ -3709,22 +3688,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertLogLevelDidNotChange()
   }
 
-  /**
-    * The AlterConfigs API is deprecated and should not support altering log 
levels
-    */
-  @nowarn("cat=deprecation")
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft"))
-  def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
-    client = createAdminClient
-
-    val alterLogLevelsEntries = Seq(
-      new ConfigEntry("kafka.server.ControllerServer", 
LogLevelConfig.INFO_LOG_LEVEL)
-    ).asJavaCollection
-    val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> 
new Config(alterLogLevelsEntries)).asJava)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException])
-  }
-
   def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], 
validateOnly: Boolean = false): Unit = {
     client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> 
entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly))
       .values.get(brokerLoggerConfigResource).get()
@@ -3890,27 +3853,21 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
 object PlaintextAdminIntegrationTest {
 
-  @nowarn("cat=deprecation")
   def checkValidAlterConfigs(
     admin: Admin,
     test: KafkaServerTestHarness,
     topicResource1: ConfigResource,
-    topicResource2: ConfigResource
-  ): Unit = {
+    topicResource2: ConfigResource,
+    maxMessageBytes: String,
+    retentionMs: String): Unit = {
     // Alter topics
-    var topicConfigEntries1 = Seq(
-      new ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000")
-    ).asJava
-
-    var topicConfigEntries2 = Seq(
-      new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"),
-      new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
-    ).asJava
-
-    var alterResult = admin.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2)
-    ).asJava)
+    val alterConfigs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    alterConfigs.put(topicResource1, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000"), OpType.SET)))
+    alterConfigs.put(topicResource2, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+      new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), OpType.SET)
+    ))
+    var alterResult = admin.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2).asJava, 
alterResult.values.keySet)
     alterResult.all.get
@@ -3924,26 +3881,16 @@ object PlaintextAdminIntegrationTest {
     assertEquals(2, configs.size)
 
     assertEquals("1000", 
configs.get(topicResource1).get(TopicConfig.FLUSH_MS_CONFIG).value)
-    assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString,
-      
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
-    assertEquals(LogConfig.DEFAULT_RETENTION_MS.toString, 
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)
+    assertEquals(maxMessageBytes, 
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
+    assertEquals(retentionMs, 
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)
 
     assertEquals("0.9", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
     assertEquals("lz4", 
configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
 
     // Alter topics with validateOnly=true
-    topicConfigEntries1 = Seq(
-      new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10")
-    ).asJava
-
-    topicConfigEntries2 = Seq(
-      new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.3")
-    ).asJava
-
-    alterResult = admin.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2)
-    ).asJava, new AlterConfigsOptions().validateOnly(true))
+    alterConfigs.put(topicResource1, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10"), OpType.SET)))
+    alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.3"), OpType.SET)))
+    alterResult = admin.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2).asJava, 
alterResult.values.keySet)
     alterResult.all.get
@@ -3955,12 +3902,10 @@ object PlaintextAdminIntegrationTest {
 
     assertEquals(2, configs.size)
 
-    assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString,
-      
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
+    assertEquals(maxMessageBytes, 
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
     assertEquals("0.9", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
   }
 
-  @nowarn("cat=deprecation")
   def checkInvalidAlterConfigs(
     test: KafkaServerTestHarness,
     admin: Admin
@@ -3974,22 +3919,17 @@ object PlaintextAdminIntegrationTest {
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
     createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers, 
numPartitions = 1, replicationFactor = 1)
 
-    val topicConfigEntries1 = Seq(
-      new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), // 
this value is invalid as it's above 1.0
-      new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
-    ).asJava
-
-    var topicConfigEntries2 = Seq(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
-
     val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
test.brokers.head.config.brokerId.toString)
-    val brokerConfigEntries = Seq(new 
ConfigEntry(ServerConfigs.BROKER_ID_CONFIG, "10")).asJava
 
     // Alter configs: first and third are invalid, second is valid
-    var alterResult = admin.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      brokerResource -> new Config(brokerConfigEntries)
-    ).asJava)
+    val alterConfigs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    alterConfigs.put(topicResource1, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), OpType.SET),
+      new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), OpType.SET)
+    ))
+    alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
+    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+    var alterResult = admin.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
@@ -4012,13 +3952,13 @@ object PlaintextAdminIntegrationTest {
     assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, 
configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
 
     // Alter configs with validateOnly = true: first and third are invalid, 
second is valid
-    topicConfigEntries2 = Seq(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip")).asJava
-
-    alterResult = admin.alterConfigs(Map(
-      topicResource1 -> new Config(topicConfigEntries1),
-      topicResource2 -> new Config(topicConfigEntries2),
-      brokerResource -> new Config(brokerConfigEntries)
-    ).asJava, new AlterConfigsOptions().validateOnly(true))
+    alterConfigs.put(topicResource1, util.Arrays.asList(
+      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), OpType.SET),
+      new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), OpType.SET)
+    ))
+    alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
+    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+    alterResult = admin.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
     assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 2d11963c846..f2ff811e47f 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,7 +69,6 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
 import java.util.concurrent.atomic.AtomicInteger
-import scala.annotation.nowarn
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
@@ -586,14 +585,16 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     val (producerThread, consumerThread) = startProduceConsume(retries = 0, 
groupProtocol)
 
     val props = new Properties
+    val logIndexSizeMaxBytes = "100000"
+    val logRetentionMs = TimeUnit.DAYS.toMillis(1)
     props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")
     props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
TimeUnit.HOURS.toMillis(2).toString)
     props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, 
TimeUnit.HOURS.toMillis(1).toString)
-    props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, "100000")
+    props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, 
logIndexSizeMaxBytes)
     props.put(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1000")
     props.put(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, "60000")
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "10000000")
-    props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
TimeUnit.DAYS.toMillis(1).toString)
+    props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
logRetentionMs.toString)
     props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "100000")
     props.put(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, "10000")
     props.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, 
TimeUnit.DAYS.toMillis(1).toString)
@@ -674,8 +675,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     assertEquals(500000, 
servers.head.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
     assertEquals(TimeUnit.DAYS.toMillis(2), 
servers.head.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
     servers.tail.foreach { server =>
-      assertEquals(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, 
server.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
-      assertEquals(1680000000L, 
server.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+      assertEquals(logIndexSizeMaxBytes.toInt, 
server.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
+      assertEquals(logRetentionMs, 
server.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
     }
 
     // Verify that produce/consume worked throughout this test without any 
retries in producer
@@ -1174,29 +1175,27 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", 
props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
   }
 
-  @nowarn("cat=deprecation")
   private def alterConfigsOnServer(server: KafkaBroker, props: Properties): 
Unit = {
-    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, 
v) }.toList.asJava
-    val newConfig = new Config(configEntries)
-    val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, 
server.config.brokerId.toString) -> newConfig).asJava
-    adminClients.head.alterConfigs(configs).all.get
+val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new 
ConfigEntry(k, v), OpType.SET) }.toList.asJava
+    val alterConfigs = new java.util.HashMap[ConfigResource, 
java.util.Collection[AlterConfigOp]]()
+    alterConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, 
server.config.brokerId.toString), configEntries)
+    adminClients.head.incrementalAlterConfigs(alterConfigs)
     props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) 
}
   }
 
-  @nowarn("cat=deprecation")
   private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, 
props: Properties,
                    perBrokerConfig: Boolean): AlterConfigsResult = {
-    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, 
v) }.toList.asJava
-    val newConfig = new Config(configEntries)
+    val configEntries = props.asScala.map { case (k, v) => new 
AlterConfigOp(new ConfigEntry(k, v), OpType.SET) }.toList.asJava
     val configs = if (perBrokerConfig) {
-      servers.map { server =>
-        val resource = new ConfigResource(ConfigResource.Type.BROKER, 
server.config.brokerId.toString)
-        (resource, newConfig)
-      }.toMap.asJava
+      val alterConfigs = new java.util.HashMap[ConfigResource, 
java.util.Collection[AlterConfigOp]]()
+      servers.foreach(server => alterConfigs.put(new 
ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString), 
configEntries))
+      alterConfigs
     } else {
-      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> 
newConfig).asJava
+      val alterConfigs = new java.util.HashMap[ConfigResource, 
java.util.Collection[AlterConfigOp]]()
+      alterConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, ""), 
configEntries)
+      alterConfigs
     }
-    adminClient.alterConfigs(configs)
+    adminClient.incrementalAlterConfigs(configs)
   }
 
   private def reconfigureServers(newProps: Properties, perBrokerConfig: 
Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit 
= {
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 21ae6c379bd..1d3021b0bbc 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -63,7 +63,6 @@ import java.{lang, util}
 import java.util.concurrent.{CompletableFuture, CompletionStage, 
ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional, OptionalLong, Properties}
-import scala.annotation.nowarn
 import scala.collection.{Seq, mutable}
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
@@ -792,83 +791,6 @@ class KRaftClusterTest {
     }
   }
 
-  @nowarn("cat=deprecation") // Suppress warnings about using legacy 
alterConfigs
-  def legacyAlter(
-    admin: Admin,
-    resources: Map[ConfigResource, Seq[ConfigEntry]]
-  ): Seq[ApiError] = {
-    val configs = new util.HashMap[ConfigResource, Config]()
-    resources.foreach {
-      case (resource, entries) => configs.put(resource, new 
Config(entries.asJava))
-    }
-    val values = admin.alterConfigs(configs).values()
-    resources.map {
-      case (resource, _) => try {
-        values.get(resource).get()
-        ApiError.NONE
-      } catch {
-        case e: ExecutionException => ApiError.fromThrowable(e.getCause)
-        case t: Throwable => ApiError.fromThrowable(t)
-      }
-    }.toSeq
-  }
-
-  @Test
-  def testLegacyAlterConfigs(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(4).
-        setNumControllerNodes(3).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
-      try {
-        val defaultBroker = new ConfigResource(Type.BROKER, "")
-
-        assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker 
-> Seq(
-          new ConfigEntry("log.roll.ms", "1234567"),
-          new ConfigEntry("max.connections.per.ip", "6")))))
-
-        validateConfigs(admin, Map(defaultBroker -> Seq(
-          ("log.roll.ms", "1234567"),
-          ("max.connections.per.ip", "6"))), exhaustive = true)
-
-        assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker 
-> Seq(
-          new ConfigEntry("log.roll.ms", "1234567")))))
-
-        // Since max.connections.per.ip was left out of the previous 
legacyAlter, it is removed.
-        validateConfigs(admin, Map(defaultBroker -> Seq(
-          ("log.roll.ms", "1234567"))), exhaustive = true)
-
-        admin.createTopics(util.Arrays.asList(
-          new NewTopic("foo", 2, 3.toShort),
-          new NewTopic("bar", 2, 3.toShort))).all().get()
-        
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
 "foo", 2)
-        
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
 "bar", 2)
-        assertEquals(Seq(ApiError.NONE,
-            new ApiError(INVALID_CONFIG, "Unknown topic config name: 
not.a.real.topic.config"),
-            new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not 
exist.")),
-          legacyAlter(admin, Map(
-            new ConfigResource(Type.TOPIC, "foo") -> Seq(
-              new ConfigEntry("segment.jitter.ms", "345")),
-            new ConfigResource(Type.TOPIC, "bar") -> Seq(
-              new ConfigEntry("not.a.real.topic.config", "789")),
-            new ConfigResource(Type.TOPIC, "baz") -> Seq(
-              new ConfigEntry("segment.jitter.ms", "678")))))
-
-        validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") -> 
Seq(
-          ("segment.jitter.ms", "345"))))
-
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
   @ParameterizedTest
   @ValueSource(strings = Array("3.7-IV0", "3.7-IV2"))
   def testCreatePartitions(metadataVersionString: String): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index e867f606235..eb63f4b521e 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils.random
 import kafka.utils._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.metrics.Quota
@@ -49,7 +49,6 @@ import java.util
 import java.util.Collections.{singletonList, singletonMap}
 import java.util.concurrent.ExecutionException
 import java.util.{Collections, Properties}
-import scala.annotation.nowarn
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
@@ -327,21 +326,6 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
-  @nowarn("cat=deprecation")
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft"))
-  def testAlterDefaultTopicConfig(quorum: String): Unit = {
-    val admin = createAdminClient()
-    try {
-      val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
-      val config = new Config(Collections.singleton(new 
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "200000")))
-      val future = admin.alterConfigs(Map(resource -> config).asJava).all
-      TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])
-    } finally {
-      admin.close()
-    }
-  }
-
   private def setBrokerConfigs(brokerId: String, newValue: Long): Unit = 
alterBrokerConfigs(brokerId, newValue, OpType.SET)
   private def deleteBrokerConfigs(brokerId: String): Unit = 
alterBrokerConfigs(brokerId, 0, OpType.DELETE)
   private def alterBrokerConfigs(brokerId: String, newValue: Long, op: 
OpType): Unit = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index cc7fcf9b2dd..bfeb392b116 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -158,6 +158,14 @@
                         </li>
                     </ul>
                 </li>
+                <li><b>Admin</b>
+                    <ul>
+                        <li>
+                            The <code>alterConfigs</code> method was removed 
from the <code>org.apache.kafka.clients.admin.Admin</code>
+                            Please use <code>incrementalAlterConfigs</code> 
instead.
+                        </li>
+                    </ul>
+                </li>
             </ul>
         </li>
         <li>Other changes:
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 68cd09a708d..b3e888c240b 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -37,7 +37,6 @@ import 
org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
 import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
 import org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
 import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
-import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.CreateAclsOptions;
 import org.apache.kafka.clients.admin.CreateAclsResult;
 import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
@@ -214,12 +213,6 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.describeConfigs(resources, options);
     }
 
-    @Override
-    @SuppressWarnings("deprecation")
-    public AlterConfigsResult alterConfigs(final Map<ConfigResource, Config> 
configs, final AlterConfigsOptions options) {
-        return adminDelegate.alterConfigs(configs, options);
-    }
-
     @Override
     public AlterConfigsResult incrementalAlterConfigs(final 
Map<ConfigResource, Collection<AlterConfigOp>> configs, final 
AlterConfigsOptions options) {
         return adminDelegate.incrementalAlterConfigs(configs, options);

Reply via email to