This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19e8c447a63 KAFKA-17137 Ensure Admin APIs are properly tested
(consumer group and quota) (#16717)
19e8c447a63 is described below
commit 19e8c447a63c6090da2d8fa303b095ce0438cc1d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Sep 4 12:08:38 2024 +0800
KAFKA-17137 Ensure Admin APIs are properly tested (consumer group and
quota) (#16717)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 63 +++++++++++++++++++---
.../kafka/server/DynamicConfigChangeTest.scala | 10 +++-
2 files changed, 65 insertions(+), 8 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c3d0c4de80a..0a97a776f65 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,7 +33,7 @@ import kafka.utils.{Log4jController, TestUtils}
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
KafkaConsumer, ShareConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
@@ -45,7 +45,7 @@ import
org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
IsolationLevel, ShareGroupState, TopicCollection, TopicPartition,
TopicPartitionInfo, TopicPartitionReplica, Uuid}
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupType,
IsolationLevel, ShareGroupState, TopicCollection, TopicPartition,
TopicPartitionInfo, TopicPartitionReplica, Uuid}
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
@@ -1826,6 +1826,23 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
matching.size == 1
}, s"Expected to be able to list $testGroupId")
+ TestUtils.waitUntilTrue(() => {
+ val options = new
ListConsumerGroupsOptions().withTypes(Set(GroupType.CLASSIC).asJava)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE)
+ matching.size == 1
+ }, s"Expected to be able to list $testGroupId in group type Classic")
+
+ TestUtils.waitUntilTrue(() => {
+ val options = new
ListConsumerGroupsOptions().withTypes(Set(GroupType.CLASSIC).asJava)
+ .inStates(Set(ConsumerGroupState.STABLE).asJava)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+ group.groupId == testGroupId &&
+ group.state.get == ConsumerGroupState.STABLE)
+ matching.size == 1
+ }, s"Expected to be able to list $testGroupId in group type Classic
and state Stable")
+
TestUtils.waitUntilTrue(() => {
val options = new
ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
@@ -1876,13 +1893,33 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test that all() returns 2 results
assertEquals(2, describeWithFakeGroupResult.all().get().size())
+ val testTopicPart0 = new TopicPartition(testTopicName, 0)
+
// Test listConsumerGroupOffsets
TestUtils.waitUntilTrue(() => {
val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
- val part = new TopicPartition(testTopicName, 0)
- parts.containsKey(part) && (parts.get(part).offset() == 1)
+ parts.containsKey(testTopicPart0) &&
(parts.get(testTopicPart0).offset() == 1)
}, s"Expected the offset for partition 0 to eventually become 1.")
+ // Test listConsumerGroupOffsets with requireStable true
+ val options = new
ListConsumerGroupOffsetsOptions().requireStable(true)
+ var parts = client.listConsumerGroupOffsets(testGroupId, options)
+ .partitionsToOffsetAndMetadata().get()
+ assertTrue(parts.containsKey(testTopicPart0))
+ assertEquals(1, parts.get(testTopicPart0).offset())
+
+ // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
+ val groupSpecs = Collections.singletonMap(testGroupId,
+ new
ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new
TopicPartition(testTopicName, 0))))
+ parts =
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
+ assertTrue(parts.containsKey(testTopicPart0))
+ assertEquals(1, parts.get(testTopicPart0).offset())
+
+ // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
and requireStable option
+ parts = client.listConsumerGroupOffsets(groupSpecs,
options).partitionsToOffsetAndMetadata().get()
+ assertTrue(parts.containsKey(testTopicPart0))
+ assertEquals(1, parts.get(testTopicPart0).offset())
+
// Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id"
var removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, new
RemoveMembersFromConsumerGroupOptions(
@@ -1908,9 +1945,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
classOf[GroupNotEmptyException])
// Test delete one correct static member
- removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, new
RemoveMembersFromConsumerGroupOptions(
- Collections.singleton(new MemberToRemove(testInstanceId1))
- ))
+ val removeOptions = new
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new
MemberToRemove(testInstanceId1)))
+ removeOptions.reason("test remove")
+ removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
assertNull(removeMembersResult.all().get())
val validMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(testInstanceId1))
@@ -1942,6 +1979,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertNull(deleteResult.deletedGroups().get(testGroupId).get())
+
+ // Test alterConsumerGroupOffsets
+ val alterConsumerGroupOffsetsResult =
client.alterConsumerGroupOffsets(testGroupId,
+ Collections.singletonMap(testTopicPart0, new
OffsetAndMetadata(0L)))
+ assertNull(alterConsumerGroupOffsetsResult.all().get())
+
assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get())
+
+ // Verify alterConsumerGroupOffsets success
+ TestUtils.waitUntilTrue(() => {
+ val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ parts.containsKey(testTopicPart0) &&
(parts.get(testTopicPart0).offset() == 0)
+ }, s"Expected the offset for partition 0 to eventually become 0.")
} finally {
consumerThreads.foreach {
case consumerThread =>
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 10eafcb51a7..818f0478f88 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config,
ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidRequestException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.metrics.Quota
@@ -211,6 +211,14 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
new ClientQuotaAlteration(entity, util.Arrays.asList(
new Op(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, null),
new Op(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, null))))
+
+ // validate only
+ admin.alterClientQuotas(removals, new
AlterClientQuotasOptions().validateOnly(true)).all().get()
+ assertEquals(Quota.upperBound(1000),
+ quotaManagers.produce.quota(user, clientId), s"User $user clientId
$clientId must have same producer quota of 1000")
+ assertEquals(Quota.upperBound(2000),
+ quotaManagers.fetch.quota(user, clientId), s"User $user clientId
$clientId must have same consumer quota of 2000")
+
admin.alterClientQuotas(removals).all().get()
TestUtils.retry(10000) {
val producerQuota = quotaManagers.produce.quota(user, clientId)