This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19e8c447a63 KAFKA-17137 Ensure Admin APIs are properly tested 
(consumer group and quota) (#16717)
19e8c447a63 is described below

commit 19e8c447a63c6090da2d8fa303b095ce0438cc1d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Sep 4 12:08:38 2024 +0800

    KAFKA-17137 Ensure Admin APIs are properly tested (consumer group and 
quota) (#16717)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 63 +++++++++++++++++++---
 .../kafka/server/DynamicConfigChangeTest.scala     | 10 +++-
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c3d0c4de80a..0a97a776f65 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,7 +33,7 @@ import kafka.utils.{Log4jController, TestUtils}
 import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, ShareConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, OffsetAndMetadata, ShareConsumer}
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter, AclOperation, AclPermissionType}
@@ -45,7 +45,7 @@ import 
org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
IsolationLevel, ShareGroupState, TopicCollection, TopicPartition, 
TopicPartitionInfo, TopicPartitionReplica, Uuid}
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupType, 
IsolationLevel, ShareGroupState, TopicCollection, TopicPartition, 
TopicPartitionInfo, TopicPartitionReplica, Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
@@ -1826,6 +1826,23 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             matching.size == 1
           }, s"Expected to be able to list $testGroupId")
 
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListConsumerGroupsOptions().withTypes(Set(GroupType.CLASSIC).asJava)
+            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+                group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in group type Classic")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListConsumerGroupsOptions().withTypes(Set(GroupType.CLASSIC).asJava)
+              .inStates(Set(ConsumerGroupState.STABLE).asJava)
+            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ConsumerGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in group type Classic 
and state Stable")
+
           TestUtils.waitUntilTrue(() => {
             val options = new 
ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava)
             val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
@@ -1876,13 +1893,33 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           // Test that all() returns 2 results
           assertEquals(2, describeWithFakeGroupResult.all().get().size())
 
+          val testTopicPart0 = new TopicPartition(testTopicName, 0)
+
           // Test listConsumerGroupOffsets
           TestUtils.waitUntilTrue(() => {
             val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
-            val part = new TopicPartition(testTopicName, 0)
-            parts.containsKey(part) && (parts.get(part).offset() == 1)
+            parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 1)
           }, s"Expected the offset for partition 0 to eventually become 1.")
 
+          // Test listConsumerGroupOffsets with requireStable true
+          val options = new 
ListConsumerGroupOffsetsOptions().requireStable(true)
+          var parts = client.listConsumerGroupOffsets(testGroupId, options)
+            .partitionsToOffsetAndMetadata().get()
+          assertTrue(parts.containsKey(testTopicPart0))
+          assertEquals(1, parts.get(testTopicPart0).offset())
+
+          // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
+          val groupSpecs = Collections.singletonMap(testGroupId,
+            new 
ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new 
TopicPartition(testTopicName, 0))))
+          parts = 
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
+          assertTrue(parts.containsKey(testTopicPart0))
+          assertEquals(1, parts.get(testTopicPart0).offset())
+
+          // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec 
and requireStable option
+          parts = client.listConsumerGroupOffsets(groupSpecs, 
options).partitionsToOffsetAndMetadata().get()
+          assertTrue(parts.containsKey(testTopicPart0))
+          assertEquals(1, parts.get(testTopicPart0).offset())
+
           // Test delete non-exist consumer instance
           val invalidInstanceId = "invalid-instance-id"
           var removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions(
@@ -1908,9 +1945,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             classOf[GroupNotEmptyException])
 
           // Test delete one correct static member
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions(
-            Collections.singleton(new MemberToRemove(testInstanceId1))
-          ))
+          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
+          removeOptions.reason("test remove")
+          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
 
           assertNull(removeMembersResult.all().get())
           val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
@@ -1942,6 +1979,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
           assertNull(deleteResult.deletedGroups().get(testGroupId).get())
+
+          // Test alterConsumerGroupOffsets
+          val alterConsumerGroupOffsetsResult = 
client.alterConsumerGroupOffsets(testGroupId,
+            Collections.singletonMap(testTopicPart0, new 
OffsetAndMetadata(0L)))
+          assertNull(alterConsumerGroupOffsetsResult.all().get())
+          
assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get())
+
+          // Verify alterConsumerGroupOffsets success
+          TestUtils.waitUntilTrue(() => {
+            val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+            parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 0)
+          }, s"Expected the offset for partition 0 to eventually become 0.")
       } finally {
         consumerThreads.foreach {
           case consumerThread =>
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 10eafcb51a7..818f0478f88 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import kafka.utils._
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, 
ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.metrics.Quota
@@ -211,6 +211,14 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
         new ClientQuotaAlteration(entity, util.Arrays.asList(
           new Op(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, null),
           new Op(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, null))))
+
+      // validate only
+      admin.alterClientQuotas(removals, new 
AlterClientQuotasOptions().validateOnly(true)).all().get()
+      assertEquals(Quota.upperBound(1000),
+        quotaManagers.produce.quota(user, clientId), s"User $user clientId 
$clientId must have same producer quota of 1000")
+      assertEquals(Quota.upperBound(2000),
+        quotaManagers.fetch.quota(user, clientId), s"User $user clientId 
$clientId must have same consumer quota of 2000")
+
       admin.alterClientQuotas(removals).all().get()
       TestUtils.retry(10000) {
         val producerQuota = quotaManagers.produce.quota(user, clientId)

Reply via email to