This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6fd951a9c0a KAFKA-17610 Drop alterConfigs (#18002)
6fd951a9c0a is described below
commit 6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Dec 2 23:26:06 2024 +0800
KAFKA-17610 Drop alterConfigs (#18002)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 35 ------
.../kafka/clients/admin/AlterConfigsOptions.java | 2 +-
.../kafka/clients/admin/AlterConfigsResult.java | 2 +-
.../kafka/clients/admin/ForwardingAdmin.java | 6 -
.../kafka/clients/admin/KafkaAdminClient.java | 68 -----------
.../kafka/clients/admin/MockAdminClient.java | 6 -
.../FakeForwardingAdminWithLocalMetadata.java | 21 ----
.../test/java/kafka/admin/ConfigCommandTest.java | 5 -
.../AdminClientWithPoliciesIntegrationTest.scala | 61 +++++----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 136 ++++++---------------
.../server/DynamicBrokerReconfigurationTest.scala | 37 +++---
.../kafka/server/KRaftClusterTest.scala | 78 ------------
.../kafka/server/DynamicConfigChangeTest.scala | 18 +--
docs/upgrade.html | 8 ++
.../TestingMetricsInterceptingAdminClient.java | 7 --
15 files changed, 97 insertions(+), 393 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 6db43732872..7f47bfbd8ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -486,41 +486,6 @@ public interface Admin extends AutoCloseable {
*/
DescribeConfigsResult describeConfigs(Collection<ConfigResource>
resources, DescribeConfigsOptions options);
- /**
- * Update the configuration for the specified resources with the default
options.
- * <p>
- * This is a convenience method for {@link #alterConfigs(Map,
AlterConfigsOptions)} with default options.
- * See the overload for more details.
- * <p>
- * This operation is supported by brokers with version 0.11.0.0 or higher.
- *
- * @param configs The resources with their configs (topic is the only
resource type with configs that can
- * be updated currently)
- * @return The AlterConfigsResult
- * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
- */
- @Deprecated
- default AlterConfigsResult alterConfigs(Map<ConfigResource, Config>
configs) {
- return alterConfigs(configs, new AlterConfigsOptions());
- }
-
- /**
- * Update the configuration for the specified resources with the default
options.
- * <p>
- * Updates are not transactional so they may succeed for some resources
while fail for others. The configs for
- * a particular resource are updated atomically.
- * <p>
- * This operation is supported by brokers with version 0.11.0.0 or higher.
- *
- * @param configs The resources with their configs (topic is the only
resource type with configs that can
- * be updated currently)
- * @param options The options to use when describing configs
- * @return The AlterConfigsResult
- * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map,
AlterConfigsOptions)}.
- */
- @Deprecated
- AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs,
AlterConfigsOptions options);
-
/**
* Incrementally updates the configuration for the specified resources
with default options.
* <p>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 198a4eab62a..b2e85b9e813 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
- * Options for {@link Admin#incrementalAlterConfigs(Map)} and {@link
Admin#alterConfigs(Map)}.
+ * Options for {@link Admin#incrementalAlterConfigs(Map)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
index 29056ce2940..007b4422b36 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.config.ConfigResource;
import java.util.Map;
/**
- * The result of the {@link Admin#alterConfigs(Map)} call.
+ * The result of the {@link Admin#incrementalAlterConfigs(Map,
AlterConfigsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 932a1f2b74d..88a693934e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -103,12 +103,6 @@ public class ForwardingAdmin implements Admin {
return delegate.describeConfigs(resources, options);
}
- @Deprecated
- @Override
- public AlterConfigsResult alterConfigs(Map<ConfigResource, Config>
configs, AlterConfigsOptions options) {
- return delegate.alterConfigs(configs, options);
- }
-
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
return delegate.incrementalAlterConfigs(configs, options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 4fdc8a45bc9..e6986c8a378 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -182,8 +182,6 @@ import org.apache.kafka.common.requests.AddRaftVoterRequest;
import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.AlterClientQuotasRequest;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
-import org.apache.kafka.common.requests.AlterConfigsRequest;
-import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
@@ -2868,72 +2866,6 @@ public class KafkaAdminClient extends AdminClient {
return configSource;
}
- @Override
- @Deprecated
- public AlterConfigsResult alterConfigs(Map<ConfigResource, Config>
configs, final AlterConfigsOptions options) {
- final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new
HashMap<>();
- // We must make a separate AlterConfigs request for every BROKER
resource we want to alter
- // and send the request to that specific node. Other resources are
grouped together into
- // a single request that may be sent to any node.
- final Collection<ConfigResource> unifiedRequestResources = new
ArrayList<>();
-
- for (ConfigResource resource : configs.keySet()) {
- Integer node = nodeFor(resource);
- if (node != null) {
- NodeProvider nodeProvider = new
ConstantBrokerOrActiveKController(node);
- allFutures.putAll(alterConfigs(configs, options,
Collections.singleton(resource), nodeProvider));
- } else
- unifiedRequestResources.add(resource);
- }
- if (!unifiedRequestResources.isEmpty())
- allFutures.putAll(alterConfigs(configs, options,
unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
- return new AlterConfigsResult(new HashMap<>(allFutures));
- }
-
- private Map<ConfigResource, KafkaFutureImpl<Void>>
alterConfigs(Map<ConfigResource, Config> configs,
- final
AlterConfigsOptions options,
-
Collection<ConfigResource> resources,
-
NodeProvider nodeProvider) {
- final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new
HashMap<>();
- final Map<ConfigResource, AlterConfigsRequest.Config> requestMap = new
HashMap<>(resources.size());
- for (ConfigResource resource : resources) {
- List<AlterConfigsRequest.ConfigEntry> configEntries = new
ArrayList<>();
- for (ConfigEntry configEntry: configs.get(resource).entries())
- configEntries.add(new
AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
- requestMap.put(resource, new
AlterConfigsRequest.Config(configEntries));
- futures.put(resource, new KafkaFutureImpl<>());
- }
-
- final long now = time.milliseconds();
- runnable.call(new Call("alterConfigs", calcDeadlineMs(now,
options.timeoutMs()), nodeProvider) {
-
- @Override
- public AlterConfigsRequest.Builder createRequest(int timeoutMs) {
- return new AlterConfigsRequest.Builder(requestMap,
options.shouldValidateOnly());
- }
-
- @Override
- public void handleResponse(AbstractResponse abstractResponse) {
- AlterConfigsResponse response = (AlterConfigsResponse)
abstractResponse;
- for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry :
futures.entrySet()) {
- KafkaFutureImpl<Void> future = entry.getValue();
- ApiException exception =
response.errors().get(entry.getKey()).exception();
- if (exception != null) {
- future.completeExceptionally(exception);
- } else {
- future.complete(null);
- }
- }
- }
-
- @Override
- void handleFailure(Throwable throwable) {
- completeAllExceptionally(futures.values(), throwable);
- }
- }, now);
- return futures;
- }
-
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
Collection<AlterConfigOp>> configs,
final
AlterConfigsOptions options) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 1ec9d6b5433..3be5dc7b3e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -871,12 +871,6 @@ public class MockAdminClient extends AdminClient {
return new Config(configEntries);
}
- @Override
- @Deprecated
- public synchronized AlterConfigsResult alterConfigs(Map<ConfigResource,
Config> configs, AlterConfigsOptions options) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(
Map<ConfigResource, Collection<AlterConfigOp>> configs,
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
index 1f2f56166a2..406cc280d59 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
@@ -17,9 +17,6 @@
package org.apache.kafka.connect.mirror.clients.admin;
-import org.apache.kafka.clients.admin.AlterConfigsOptions;
-import org.apache.kafka.clients.admin.AlterConfigsResult;
-import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
@@ -30,7 +27,6 @@ import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
@@ -79,23 +75,6 @@ public class FakeForwardingAdminWithLocalMetadata extends
ForwardingAdmin {
return createPartitionsResult;
}
- @Deprecated
- @Override
- public AlterConfigsResult alterConfigs(Map<ConfigResource, Config>
configs, AlterConfigsOptions options) {
- AlterConfigsResult alterConfigsResult = super.alterConfigs(configs,
options);
- configs.forEach((configResource, newConfigs) ->
alterConfigsResult.values().get(configResource).whenComplete((ignored, error)
-> {
- if (error == null) {
- if (configResource.type() == ConfigResource.Type.TOPIC) {
-
FakeLocalMetadataStore.updateTopicConfig(configResource.name(), newConfigs);
- }
- } else {
- log.error("Unable to intercept admin client operation", error);
- }
- }));
- return alterConfigsResult;
- }
-
-
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls,
CreateAclsOptions options) {
CreateAclsResult aclsResult = super.createAcls(acls, options);
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 308777417be..70b1faee43b 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -1459,11 +1459,6 @@ public class ConfigCommandTest {
return mock(AlterConfigsResult.class);
}
- @Override
- public synchronized AlterConfigsResult
alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
- return mock(AlterConfigsResult.class);
- }
-
@Override
public DescribeClientQuotasResult
describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions
options) {
return mock(DescribeClientQuotasResult.class);
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 6f902b2db3b..562fa12d036 100644
---
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -14,13 +14,13 @@
package kafka.api
import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsOptions, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, SslConfigs, TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach,
TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -88,15 +87,17 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
val topic1 = "describe-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
val topicConfig1 = new Properties
- topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
- topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+ val maxMessageBytes = "500000"
+ val retentionMs = "60000000"
+ topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
maxMessageBytes)
+ topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
createTopic(topic1, 1, 1, topicConfig1)
val topic2 = "describe-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2)
- PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this,
topicResource1, topicResource2)
+ PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this,
topicResource1, topicResource2, maxMessageBytes, retentionMs)
}
@ParameterizedTest
@@ -106,7 +107,6 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
}
- @nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
@@ -127,30 +127,32 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
// Set a mutable broker config
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
brokers.head.config.brokerId.toString)
- val brokerConfigs = Seq(new
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000")).asJava
- val alterResult1 = client.alterConfigs(Map(brokerResource -> new
Config(brokerConfigs)).asJava)
- alterResult1.all.get
+ var alterResult =
client.incrementalAlterConfigs(Collections.singletonMap(brokerResource,
+ util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))))
+ alterResult.all.get
assertEquals(Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG),
validationsForResource(brokerResource).head.configs().keySet().asScala)
validations.clear()
- val topicConfigEntries1 = Seq(
- new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"),
- new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") // policy
doesn't allow this
- ).asJava
+ val alterConfigs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ alterConfigs.put(topicResource1, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
+ ))
- var topicConfigEntries2 = Seq(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8")).asJava
+ alterConfigs.put(topicResource2, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET),
+ ))
- val topicConfigEntries3 = Seq(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1")).asJava
+ alterConfigs.put(topicResource3, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET),
+ ))
- val brokerConfigEntries = Seq(new
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313")).asJava
+ alterConfigs.put(brokerResource, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET),
+ ))
// Alter configs: second is valid, the others are invalid
- var alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- topicResource3 -> new Config(topicConfigEntries3),
- brokerResource -> new Config(brokerConfigEntries)
- ).asJava)
+ alterResult = client.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
@@ -175,14 +177,11 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
// Alter configs with validateOnly = true: only second is valid
- topicConfigEntries2 = Seq(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7")).asJava
-
- alterResult = client.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- brokerResource -> new Config(brokerConfigEntries),
- topicResource3 -> new Config(topicConfigEntries3)
- ).asJava, new AlterConfigsOptions().validateOnly(true))
+ alterConfigs.put(topicResource2, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET),
+ ))
+
+ alterResult = client.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0c840e4bf64..2eb773c0614 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,6 +33,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.HostResolver
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
@@ -53,7 +54,7 @@ import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs,
ServerLogConfigs}
+import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs,
ServerLogConfigs, ZkConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogFileUtils}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.apache.log4j.PropertyConfigurator
@@ -64,7 +65,6 @@ import org.junit.jupiter.params.provider.{MethodSource,
ValueSource}
import org.slf4j.LoggerFactory
import java.util.AbstractMap.SimpleImmutableEntry
-import scala.annotation.nowarn
import scala.collection.Seq
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@@ -122,29 +122,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
} finally brokenClient.close(time.Duration.ZERO)
}
- @ParameterizedTest
- @Timeout(30)
- @ValueSource(strings = Array("kraft"))
- def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
- client = createAdminClient
- val config = createConfig
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
- val brokenClient = Admin.create(config)
-
- try {
- val alterLogLevelsEntries = Seq(
- new ConfigEntry("kafka.server.ControllerServer",
LogLevelConfig.INFO_LOG_LEVEL)
- ).asJavaCollection
-
- val exception = assertThrows(classOf[ExecutionException], () => {
- brokenClient.alterConfigs(
- Map(brokerLoggerConfigResource -> new
Config(alterLogLevelsEntries)).asJava,
- new AlterConfigsOptions().timeoutMs(0)).all().get()
- })
- assertInstanceOf(classOf[TimeoutException], exception.getCause)
- } finally brokenClient.close(time.Duration.ZERO)
- }
-
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit
= {
@@ -993,8 +970,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val topic1 = "describe-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
val topicConfig1 = new Properties
- topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
- topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+ val maxMessageBytes = "500000"
+ val retentionMs = "60000000"
+ topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
maxMessageBytes)
+ topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
val topic2 = "describe-alter-configs-topic-2"
@@ -1064,7 +1043,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(brokers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)
- checkValidAlterConfigs(client, this, topicResource1, topicResource2)
+ checkValidAlterConfigs(client, this, topicResource1, topicResource2,
maxMessageBytes, retentionMs)
}
@ParameterizedTest
@@ -3709,22 +3688,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertLogLevelDidNotChange()
}
- /**
- * The AlterConfigs API is deprecated and should not support altering log
levels
- */
- @nowarn("cat=deprecation")
- @ParameterizedTest
- @ValueSource(strings = Array("kraft"))
- def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
- client = createAdminClient
-
- val alterLogLevelsEntries = Seq(
- new ConfigEntry("kafka.server.ControllerServer",
LogLevelConfig.INFO_LOG_LEVEL)
- ).asJavaCollection
- val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource ->
new Config(alterLogLevelsEntries)).asJava)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException])
- }
-
def alterBrokerLoggers(entries: util.Collection[AlterConfigOp],
validateOnly: Boolean = false): Unit = {
client.incrementalAlterConfigs(Map(brokerLoggerConfigResource ->
entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly))
.values.get(brokerLoggerConfigResource).get()
@@ -3890,27 +3853,21 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
object PlaintextAdminIntegrationTest {
- @nowarn("cat=deprecation")
def checkValidAlterConfigs(
admin: Admin,
test: KafkaServerTestHarness,
topicResource1: ConfigResource,
- topicResource2: ConfigResource
- ): Unit = {
+ topicResource2: ConfigResource,
+ maxMessageBytes: String,
+ retentionMs: String): Unit = {
// Alter topics
- var topicConfigEntries1 = Seq(
- new ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000")
- ).asJava
-
- var topicConfigEntries2 = Seq(
- new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"),
- new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
- ).asJava
-
- var alterResult = admin.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2)
- ).asJava)
+ val alterConfigs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ alterConfigs.put(topicResource1, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000"), OpType.SET)))
+ alterConfigs.put(topicResource2, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG,
"lz4"), OpType.SET)
+ ))
+ var alterResult = admin.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2).asJava,
alterResult.values.keySet)
alterResult.all.get
@@ -3924,26 +3881,16 @@ object PlaintextAdminIntegrationTest {
assertEquals(2, configs.size)
assertEquals("1000",
configs.get(topicResource1).get(TopicConfig.FLUSH_MS_CONFIG).value)
- assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString,
-
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
- assertEquals(LogConfig.DEFAULT_RETENTION_MS.toString,
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)
+ assertEquals(maxMessageBytes,
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
+ assertEquals(retentionMs,
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)
assertEquals("0.9",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
assertEquals("lz4",
configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
// Alter topics with validateOnly=true
- topicConfigEntries1 = Seq(
- new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10")
- ).asJava
-
- topicConfigEntries2 = Seq(
- new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.3")
- ).asJava
-
- alterResult = admin.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2)
- ).asJava, new AlterConfigsOptions().validateOnly(true))
+ alterConfigs.put(topicResource1, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10"), OpType.SET)))
+ alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.3"), OpType.SET)))
+ alterResult = admin.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2).asJava,
alterResult.values.keySet)
alterResult.all.get
@@ -3955,12 +3902,10 @@ object PlaintextAdminIntegrationTest {
assertEquals(2, configs.size)
- assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString,
-
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
+ assertEquals(maxMessageBytes,
configs.get(topicResource1).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG).value)
assertEquals("0.9",
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
}
- @nowarn("cat=deprecation")
def checkInvalidAlterConfigs(
test: KafkaServerTestHarness,
admin: Admin
@@ -3974,22 +3919,17 @@ object PlaintextAdminIntegrationTest {
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers,
numPartitions = 1, replicationFactor = 1)
- val topicConfigEntries1 = Seq(
- new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), //
this value is invalid as it's above 1.0
- new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
- ).asJava
-
- var topicConfigEntries2 = Seq(new
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
-
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
test.brokers.head.config.brokerId.toString)
- val brokerConfigEntries = Seq(new
ConfigEntry(ServerConfigs.BROKER_ID_CONFIG, "10")).asJava
// Alter configs: first and third are invalid, second is valid
- var alterResult = admin.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- brokerResource -> new Config(brokerConfigEntries)
- ).asJava)
+ val alterConfigs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ alterConfigs.put(topicResource1, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG,
"lz4"), OpType.SET)
+ ))
+ alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
+ alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+ var alterResult = admin.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
@@ -4012,13 +3952,13 @@ object PlaintextAdminIntegrationTest {
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE,
configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
// Alter configs with validateOnly = true: first and third are invalid,
second is valid
- topicConfigEntries2 = Seq(new
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip")).asJava
-
- alterResult = admin.alterConfigs(Map(
- topicResource1 -> new Config(topicConfigEntries1),
- topicResource2 -> new Config(topicConfigEntries2),
- brokerResource -> new Config(brokerConfigEntries)
- ).asJava, new AlterConfigsOptions().validateOnly(true))
+ alterConfigs.put(topicResource1, util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), OpType.SET),
+ new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG,
"lz4"), OpType.SET)
+ ))
+ alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
+ alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+ alterResult = admin.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 2d11963c846..f2ff811e47f 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,7 +69,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.util.concurrent.atomic.AtomicInteger
-import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@@ -586,14 +585,16 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
val (producerThread, consumerThread) = startProduceConsume(retries = 0,
groupProtocol)
val props = new Properties
+ val logIndexSizeMaxBytes = "100000"
+ val logRetentionMs = TimeUnit.DAYS.toMillis(1)
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")
props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
TimeUnit.HOURS.toMillis(2).toString)
props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG,
TimeUnit.HOURS.toMillis(1).toString)
- props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, "100000")
+ props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG,
logIndexSizeMaxBytes)
props.put(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1000")
props.put(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, "60000")
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "10000000")
- props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
TimeUnit.DAYS.toMillis(1).toString)
+ props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
logRetentionMs.toString)
props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "100000")
props.put(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, "10000")
props.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
TimeUnit.DAYS.toMillis(1).toString)
@@ -674,8 +675,8 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
assertEquals(500000,
servers.head.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
assertEquals(TimeUnit.DAYS.toMillis(2),
servers.head.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
servers.tail.foreach { server =>
- assertEquals(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
server.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
- assertEquals(1680000000L,
server.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+ assertEquals(logIndexSizeMaxBytes.toInt,
server.config.values.get(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG))
+ assertEquals(logRetentionMs,
server.config.values.get(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
}
// Verify that produce/consume worked throughout this test without any
retries in producer
@@ -1174,29 +1175,27 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG",
props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
}
- @nowarn("cat=deprecation")
private def alterConfigsOnServer(server: KafkaBroker, props: Properties):
Unit = {
- val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k,
v) }.toList.asJava
- val newConfig = new Config(configEntries)
- val configs = Map(new ConfigResource(ConfigResource.Type.BROKER,
server.config.brokerId.toString) -> newConfig).asJava
- adminClients.head.alterConfigs(configs).all.get
+val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new
ConfigEntry(k, v), OpType.SET) }.toList.asJava
+ val alterConfigs = new java.util.HashMap[ConfigResource,
java.util.Collection[AlterConfigOp]]()
+ alterConfigs.put(new ConfigResource(ConfigResource.Type.BROKER,
server.config.brokerId.toString), configEntries)
+ adminClients.head.incrementalAlterConfigs(alterConfigs)
props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v)
}
}
- @nowarn("cat=deprecation")
private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin,
props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
- val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k,
v) }.toList.asJava
- val newConfig = new Config(configEntries)
+ val configEntries = props.asScala.map { case (k, v) => new
AlterConfigOp(new ConfigEntry(k, v), OpType.SET) }.toList.asJava
val configs = if (perBrokerConfig) {
- servers.map { server =>
- val resource = new ConfigResource(ConfigResource.Type.BROKER,
server.config.brokerId.toString)
- (resource, newConfig)
- }.toMap.asJava
+ val alterConfigs = new java.util.HashMap[ConfigResource,
java.util.Collection[AlterConfigOp]]()
+ servers.foreach(server => alterConfigs.put(new
ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString),
configEntries))
+ alterConfigs
} else {
- Map(new ConfigResource(ConfigResource.Type.BROKER, "") ->
newConfig).asJava
+ val alterConfigs = new java.util.HashMap[ConfigResource,
java.util.Collection[AlterConfigOp]]()
+ alterConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, ""),
configEntries)
+ alterConfigs
}
- adminClient.alterConfigs(configs)
+ adminClient.incrementalAlterConfigs(configs)
}
private def reconfigureServers(newProps: Properties, perBrokerConfig:
Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit
= {
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 21ae6c379bd..1d3021b0bbc 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -63,7 +63,6 @@ import java.{lang, util}
import java.util.concurrent.{CompletableFuture, CompletionStage,
ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional, OptionalLong, Properties}
-import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -792,83 +791,6 @@ class KRaftClusterTest {
}
}
- @nowarn("cat=deprecation") // Suppress warnings about using legacy
alterConfigs
- def legacyAlter(
- admin: Admin,
- resources: Map[ConfigResource, Seq[ConfigEntry]]
- ): Seq[ApiError] = {
- val configs = new util.HashMap[ConfigResource, Config]()
- resources.foreach {
- case (resource, entries) => configs.put(resource, new
Config(entries.asJava))
- }
- val values = admin.alterConfigs(configs).values()
- resources.map {
- case (resource, _) => try {
- values.get(resource).get()
- ApiError.NONE
- } catch {
- case e: ExecutionException => ApiError.fromThrowable(e.getCause)
- case t: Throwable => ApiError.fromThrowable(t)
- }
- }.toSeq
- }
-
- @Test
- def testLegacyAlterConfigs(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(4).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
- try {
- val defaultBroker = new ConfigResource(Type.BROKER, "")
-
- assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker
-> Seq(
- new ConfigEntry("log.roll.ms", "1234567"),
- new ConfigEntry("max.connections.per.ip", "6")))))
-
- validateConfigs(admin, Map(defaultBroker -> Seq(
- ("log.roll.ms", "1234567"),
- ("max.connections.per.ip", "6"))), exhaustive = true)
-
- assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker
-> Seq(
- new ConfigEntry("log.roll.ms", "1234567")))))
-
- // Since max.connections.per.ip was left out of the previous
legacyAlter, it is removed.
- validateConfigs(admin, Map(defaultBroker -> Seq(
- ("log.roll.ms", "1234567"))), exhaustive = true)
-
- admin.createTopics(util.Arrays.asList(
- new NewTopic("foo", 2, 3.toShort),
- new NewTopic("bar", 2, 3.toShort))).all().get()
-
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"foo", 2)
-
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"bar", 2)
- assertEquals(Seq(ApiError.NONE,
- new ApiError(INVALID_CONFIG, "Unknown topic config name:
not.a.real.topic.config"),
- new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not
exist.")),
- legacyAlter(admin, Map(
- new ConfigResource(Type.TOPIC, "foo") -> Seq(
- new ConfigEntry("segment.jitter.ms", "345")),
- new ConfigResource(Type.TOPIC, "bar") -> Seq(
- new ConfigEntry("not.a.real.topic.config", "789")),
- new ConfigResource(Type.TOPIC, "baz") -> Seq(
- new ConfigEntry("segment.jitter.ms", "678")))))
-
- validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") ->
Seq(
- ("segment.jitter.ms", "345"))))
-
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
@ParameterizedTest
@ValueSource(strings = Array("3.7-IV0", "3.7-IV2"))
def testCreatePartitions(metadataVersionString: String): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index e867f606235..eb63f4b521e 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils.random
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidRequestException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.metrics.Quota
@@ -49,7 +49,6 @@ import java.util
import java.util.Collections.{singletonList, singletonMap}
import java.util.concurrent.ExecutionException
import java.util.{Collections, Properties}
-import scala.annotation.nowarn
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -327,21 +326,6 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
}
- @nowarn("cat=deprecation")
- @ParameterizedTest
- @ValueSource(strings = Array("kraft"))
- def testAlterDefaultTopicConfig(quorum: String): Unit = {
- val admin = createAdminClient()
- try {
- val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
- val config = new Config(Collections.singleton(new
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "200000")))
- val future = admin.alterConfigs(Map(resource -> config).asJava).all
- TestUtils.assertFutureExceptionTypeEquals(future,
classOf[InvalidRequestException])
- } finally {
- admin.close()
- }
- }
-
private def setBrokerConfigs(brokerId: String, newValue: Long): Unit =
alterBrokerConfigs(brokerId, newValue, OpType.SET)
private def deleteBrokerConfigs(brokerId: String): Unit =
alterBrokerConfigs(brokerId, 0, OpType.DELETE)
private def alterBrokerConfigs(brokerId: String, newValue: Long, op:
OpType): Unit = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index cc7fcf9b2dd..bfeb392b116 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -158,6 +158,14 @@
</li>
</ul>
</li>
+ <li><b>Admin</b>
+ <ul>
+ <li>
+ The <code>alterConfigs</code> method was removed
from the <code>org.apache.kafka.clients.admin.Admin</code>
+ Please use <code>incrementalAlterConfigs</code>
instead.
+ </li>
+ </ul>
+ </li>
</ul>
</li>
<li>Other changes:
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 68cd09a708d..b3e888c240b 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -37,7 +37,6 @@ import
org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
-import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
@@ -214,12 +213,6 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
return adminDelegate.describeConfigs(resources, options);
}
- @Override
- @SuppressWarnings("deprecation")
- public AlterConfigsResult alterConfigs(final Map<ConfigResource, Config>
configs, final AlterConfigsOptions options) {
- return adminDelegate.alterConfigs(configs, options);
- }
-
@Override
public AlterConfigsResult incrementalAlterConfigs(final
Map<ConfigResource, Collection<AlterConfigOp>> configs, final
AlterConfigsOptions options) {
return adminDelegate.incrementalAlterConfigs(configs, options);