This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 72e72e3537d KAFKA-16865: Add IncludeTopicAuthorizedOperations option 
for DescribeTopicPartitionsRequest (#16136)
72e72e3537d is described below

commit 72e72e3537de863d90d437184cf3711887ac7e57
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Wed Jun 12 16:04:24 2024 +0100

    KAFKA-16865: Add IncludeTopicAuthorizedOperations option for 
DescribeTopicPartitionsRequest (#16136)
    
    
    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Calvin Liu <[email protected]>, Andrew Schofield 
<[email protected]>, Apoorv Mittal <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  8 ++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 73 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5c8d9ebb799..55646e27820 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2246,7 +2246,7 @@ public class KafkaAdminClient extends AdminClient {
                         continue;
                     }
 
-                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes, 
options.includeAuthorizedOperations());
 
                     if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
                         // Add the partitions for the cursor topic of the 
previous batch.
@@ -2409,14 +2409,16 @@ public class KafkaAdminClient extends AdminClient {
 
     private TopicDescription 
getTopicDescriptionFromDescribeTopicsResponseTopic(
         DescribeTopicPartitionsResponseTopic topic,
-        Map<Integer, Node> nodes
+        Map<Integer, Node> nodes,
+        boolean includeAuthorizedOperations
     ) {
         List<DescribeTopicPartitionsResponsePartition> partitionInfos = 
topic.partitions();
         List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
         for (DescribeTopicPartitionsResponsePartition partitionInfo : 
partitionInfos) {
             
partitions.add(DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo(partitionInfo,
 nodes));
         }
-        return new TopicDescription(topic.name(), topic.isInternal(), 
partitions, validAclOperations(topic.topicAuthorizedOperations()), 
topic.topicId());
+        Set<AclOperation> authorisedOperations = includeAuthorizedOperations ? 
validAclOperations(topic.topicAuthorizedOperations()) : null;
+        return new TopicDescription(topic.name(), topic.isInternal(), 
partitions, authorisedOperations, topic.topicId());
     }
 
     private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, 
String topicName, Uuid topicId,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ea1305e533b..e04de635f71 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1464,6 +1464,7 @@ public class KafkaAdminClientTest {
                 assertEquals(0, 
topicDescription.partitions().get(0).partition());
                 assertEquals(1, 
topicDescription.partitions().get(1).partition());
                 topicDescription = topicDescriptions.get(topicName1);
+                assertNull(topicDescription.authorizedOperations());
                 assertEquals(1, topicDescription.partitions().size());
             } catch (Exception e) {
                 fail("describe using DescribeTopics API should not fail", e);
@@ -1471,6 +1472,77 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws 
ExecutionException, InterruptedException {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String topicName0 = "test-0";
+            Uuid topicId =  Uuid.randomUuid();
+
+            int authorisedOperations = 
Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), 
AclOperation.ALTER.code()));
+            env.kafkaClient().prepareResponse(
+                    prepareDescribeClusterResponse(0,
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            2,
+                            authorisedOperations)
+            );
+
+            DescribeTopicPartitionsResponseData responseData = new 
DescribeTopicPartitionsResponseData();
+            responseData.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                    .setErrorCode((short) 0)
+                    .setTopicId(topicId)
+                    .setName(topicName0)
+                    .setIsInternal(false)
+                    .setTopicAuthorizedOperations(authorisedOperations));
+            env.kafkaClient().prepareResponse(new 
DescribeTopicPartitionsResponse(responseData));
+
+            DescribeTopicsResult result = env.adminClient().describeTopics(
+                    singletonList(topicName0), new 
DescribeTopicsOptions().includeAuthorizedOperations(true)
+            );
+
+            Map<String, TopicDescription> topicDescriptions = 
result.allTopicNames().get();
+            TopicDescription topicDescription = 
topicDescriptions.get(topicName0);
+            assertEquals(new HashSet<>(asList(AclOperation.DESCRIBE, 
AclOperation.ALTER)),
+                    topicDescription.authorizedOperations());
+        }
+    }
+
+    @Test
+    public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws 
ExecutionException, InterruptedException {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String topicName0 = "test-0";
+            Uuid topicId =  Uuid.randomUuid();
+
+            int authorisedOperations = 
Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), 
AclOperation.ALTER.code()));
+            env.kafkaClient().prepareResponse(
+                    prepareDescribeClusterResponse(0,
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            2,
+                            authorisedOperations)
+            );
+
+            DescribeTopicPartitionsResponseData responseData = new 
DescribeTopicPartitionsResponseData();
+            responseData.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                    .setErrorCode((short) 0)
+                    .setTopicId(topicId)
+                    .setName(topicName0)
+                    .setIsInternal(false)
+                    .setTopicAuthorizedOperations(authorisedOperations));
+            env.kafkaClient().prepareResponse(new 
DescribeTopicPartitionsResponse(responseData));
+
+            DescribeTopicsResult result = env.adminClient().describeTopics(
+                    singletonList(topicName0), new 
DescribeTopicsOptions().includeAuthorizedOperations(false)
+            );
+
+            Map<String, TopicDescription> topicDescriptions = 
result.allTopicNames().get();
+            TopicDescription topicDescription = 
topicDescriptions.get(topicName0);
+            assertNull(topicDescription.authorizedOperations());
+        }
+    }
+
     @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
     @Test
     public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() {
@@ -1547,6 +1619,7 @@ public class KafkaAdminClientTest {
                 assertEquals(2, topicDescription.partitions().size());
                 topicDescription = topicDescriptions.get(topicName2);
                 assertEquals(2, topicDescription.partitions().size());
+                assertNull(topicDescription.authorizedOperations());
             } catch (Exception e) {
                 fail("describe using DescribeTopics API should not fail", e);
             }

Reply via email to