This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc0f06554ba KAFKA-19042 Move GroupAuthorizerIntegrationTest to 
clients-integration-tests module (#19685)
cc0f06554ba is described below

commit cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b
Author: Nick Guo <[email protected]>
AuthorDate: Sat May 31 02:34:56 2025 +0800

    KAFKA-19042 Move GroupAuthorizerIntegrationTest to 
clients-integration-tests module (#19685)
    
    move GroupAuthorizerIntegrationTest to clients-integration-tests module
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, keemsisi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../security/GroupAuthorizerIntegrationTest.java   | 402 +++++++++++++++++++++
 .../kafka/api/GroupAuthorizerIntegrationTest.scala | 239 ------------
 2 files changed, 402 insertions(+), 239 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
new file mode 100644
index 00000000000..725c0f53786
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.security;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.config.ServerConfigs;
+
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@ClusterTestDefaults(serverProperties = {
+    @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value 
= "Group:broker"),
+    @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    @ClusterConfigProperty(key = ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
+    @ClusterConfigProperty(key = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, value = 
"org.apache.kafka.clients.security.GroupAuthorizerIntegrationTest$GroupPrincipalBuilder"),
+})
+public class GroupAuthorizerIntegrationTest {
+    private static final KafkaPrincipal BROKER_PRINCIPAL = new 
KafkaPrincipal("Group", "broker");
+    private static final KafkaPrincipal CLIENT_PRINCIPAL = new 
KafkaPrincipal("Group", "client");
+
+    private static final String BROKER_LISTENER_NAME = "BROKER";
+    private static final String CLIENT_LISTENER_NAME = "EXTERNAL";
+    private static final String CONTROLLER_LISTENER_NAME = "CONTROLLER";
+
+    private Authorizer getAuthorizer(ClusterInstance clusterInstance) {
+        return clusterInstance.controllers().values().stream()
+                .filter(server -> server.authorizerPlugin().isDefined())
+                .map(server -> 
server.authorizerPlugin().get().get()).findFirst().get();
+    }
+
+    private void setup(ClusterInstance clusterInstance) throws 
InterruptedException {
+        // Allow inter-broker communication
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.CLUSTER_ACTION, 
AclPermissionType.ALLOW, BROKER_PRINCIPAL)),
+                new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL),
+                clusterInstance
+        );
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.TOPIC, 
Topic.GROUP_METADATA_TOPIC_NAME, PatternType.LITERAL),
+                clusterInstance
+        );
+
+        NewTopic offsetTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 
1, (short) 1);
+        try (Admin admin = clusterInstance.admin(Map.of(
+                AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true))
+        ) {
+            admin.createTopics(Collections.singleton(offsetTopic));
+            clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1);
+        }
+    }
+
+    public static class GroupPrincipalBuilder extends 
DefaultKafkaPrincipalBuilder {
+        public GroupPrincipalBuilder() {
+            super(null, null);
+        }
+
+        @Override
+        public KafkaPrincipal build(AuthenticationContext context) {
+            String listenerName = context.listenerName();
+            return switch (listenerName) {
+                case BROKER_LISTENER_NAME, CONTROLLER_LISTENER_NAME -> 
BROKER_PRINCIPAL;
+                case CLIENT_LISTENER_NAME -> CLIENT_PRINCIPAL;
+                default -> throw new IllegalArgumentException("No principal 
mapped to listener " + listenerName);
+            };
+        }
+    }
+
+    private AccessControlEntry createAcl(AclOperation aclOperation, 
AclPermissionType aclPermissionType, KafkaPrincipal principal) {
+        return new AccessControlEntry(
+                principal.toString(),
+                WILDCARD_HOST,
+                aclOperation,
+                aclPermissionType
+        );
+    }
+
+    private void addAndVerifyAcls(Set<AccessControlEntry> acls, 
ResourcePattern resource, ClusterInstance clusterInstance) throws 
InterruptedException {
+        List<AclBinding> aclBindings = acls.stream().map(acl -> new 
AclBinding(resource, acl)).toList();
+        Authorizer authorizer = getAuthorizer(clusterInstance);
+        authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
+                .forEach(future -> {
+                    try {
+                        future.toCompletableFuture().get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException("Failed to create ACLs", e);
+                    }
+                });
+        AclBindingFilter aclBindingFilter = new 
AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
+        clusterInstance.waitAcls(aclBindingFilter, acls);
+    }
+
+    static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new 
AuthorizableRequestContext() {
+        @Override
+        public String listenerName() {
+            return "";
+        }
+
+        @Override
+        public SecurityProtocol securityProtocol() {
+            return SecurityProtocol.PLAINTEXT;
+        }
+
+        @Override
+        public KafkaPrincipal principal() {
+            return KafkaPrincipal.ANONYMOUS;
+        }
+
+        @Override
+        public InetAddress clientAddress() {
+            return null;
+        }
+
+        @Override
+        public int requestType() {
+            return 0;
+        }
+
+        @Override
+        public int requestVersion() {
+            return 0;
+        }
+
+        @Override
+        public String clientId() {
+            return "";
+        }
+
+        @Override
+        public int correlationId() {
+            return 0;
+        }
+    };
+
+    @ClusterTest
+    public void 
testUnauthorizedProduceAndConsumeWithClassicConsumer(ClusterInstance 
clusterInstance) throws InterruptedException {
+        testUnauthorizedProduceAndConsume(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testUnauthorizedProduceAndConsumeWithAsyncConsumer(ClusterInstance 
clusterInstance) throws InterruptedException {
+        testUnauthorizedProduceAndConsume(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    public void testUnauthorizedProduceAndConsume(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
+        setup(clusterInstance);
+        String topic = "topic";
+        String group = "group";
+
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL),
+                clusterInstance
+        );
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.GROUP, group, 
PatternType.LITERAL),
+                clusterInstance
+        );
+
+        Producer<byte[], byte[]> producer = clusterInstance.producer();
+        Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
+                GROUP_PROTOCOL_CONFIG, 
groupProtocol.name.toLowerCase(Locale.ROOT),
+                ConsumerConfig.GROUP_ID_CONFIG, group
+        ));
+
+        try {
+            clusterInstance.createTopic(topic, 1, (short) 1);
+            ExecutionException produceException = assertThrows(
+                ExecutionException.class,
+                () -> producer.send(new ProducerRecord<>(topic, 
"message".getBytes())).get()
+            );
+            Throwable cause = produceException.getCause();
+            assertInstanceOf(TopicAuthorizationException.class, cause);
+            TopicAuthorizationException topicAuthException = 
(TopicAuthorizationException) cause;
+            assertEquals(Set.of(topic), 
topicAuthException.unauthorizedTopics());
+
+            TopicPartition topicPartition = new TopicPartition(topic, 0);
+            consumer.assign(Collections.singletonList(topicPartition));
+            TopicAuthorizationException consumeException = assertThrows(
+                TopicAuthorizationException.class,
+                () -> consumer.poll(Duration.ofSeconds(15))
+            );
+            assertEquals(consumeException.unauthorizedTopics(), 
topicAuthException.unauthorizedTopics());
+        } finally {
+            producer.close(Duration.ZERO);
+            consumer.close();
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeUnsubscribeWithGroupPermission(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeUnsubscribeWithGroupPermission(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, 
ExecutionException {
+        setup(clusterInstance);
+        String topic = "topic";
+        String group = "group";
+
+        // allow topic read/write permission to poll/send record
+        Set<AccessControlEntry> acls = new HashSet<>();
+        acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        addAndVerifyAcls(
+            acls,
+            new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL),
+            clusterInstance
+        );
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.GROUP, group, 
PatternType.LITERAL),
+                clusterInstance
+        );
+
+        try (Producer<byte[], byte[]> producer = clusterInstance.producer();
+            Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(
+                ConsumerConfig.GROUP_ID_CONFIG, group,
+                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+                GROUP_PROTOCOL_CONFIG, 
groupProtocol.name.toLowerCase(Locale.ROOT)))
+        ) {
+            clusterInstance.createTopic(topic, 1, (short) 1);
+            producer.send(new ProducerRecord<>(topic, 
"message".getBytes())).get();
+            consumer.subscribe(Collections.singletonList(topic));
+            TestUtils.waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(15));
+                return records.count() == 1;
+            }, "consumer failed to receive message");
+            assertDoesNotThrow(consumer::unsubscribe);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumeCloseWithGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeCloseWithGroupPermission(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumeCloseWithGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeCloseWithGroupPermission(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void testConsumeCloseWithGroupPermission(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, 
ExecutionException {
+        setup(clusterInstance);
+        String topic = "topic";
+        String group = "group";
+
+        // allow topic read/write permission to poll/send record
+        Set<AccessControlEntry> acls = new HashSet<>();
+        acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        addAndVerifyAcls(
+                acls,
+                new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL),
+                clusterInstance
+        );
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.GROUP, group, 
PatternType.LITERAL),
+                clusterInstance
+        );
+
+        Producer<Object, Object> producer = clusterInstance.producer();
+        Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
+                ConsumerConfig.GROUP_ID_CONFIG, group,
+                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+                GROUP_PROTOCOL_CONFIG, 
groupProtocol.name.toLowerCase(Locale.ROOT)));
+
+        try {
+            clusterInstance.createTopic(topic, 1, (short) 1);
+            producer.send(new ProducerRecord<>(topic, 
"message".getBytes())).get();
+            consumer.subscribe(List.of(topic));
+            TestUtils.waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(15));
+                return records.count() == 1;
+            }, "consumer failed to receive message");
+        } finally {
+            producer.close();
+            assertDoesNotThrow(() -> consumer.close());
+        }
+    }
+
+    @ClusterTest
+    public void testAuthorizedProduceAndConsumeWithClassic(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testAuthorizedProduceAndConsume(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAuthorizedProduceAndConsumeWithAsync(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testAuthorizedProduceAndConsume(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void testAuthorizedProduceAndConsume(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, 
ExecutionException {
+        setup(clusterInstance);
+        String topic = "topic";
+        String group = "group";
+
+        Set<AccessControlEntry> acls = new HashSet<>();
+        acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL));
+        addAndVerifyAcls(
+                acls,
+                new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL),
+                clusterInstance
+        );
+        addAndVerifyAcls(
+                Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW, 
CLIENT_PRINCIPAL)),
+                new ResourcePattern(ResourceType.GROUP, group, 
PatternType.LITERAL),
+                clusterInstance
+        );
+
+        try (Producer<byte[], byte[]> producer = clusterInstance.producer();
+             Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(
+                     ConsumerConfig.GROUP_ID_CONFIG, group,
+                     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+                     GROUP_PROTOCOL_CONFIG, 
groupProtocol.name.toLowerCase(Locale.ROOT)))
+        ) {
+            clusterInstance.createTopic(topic, 1, (short) 1);
+            producer.send(new ProducerRecord<>(topic, 
"message".getBytes())).get();
+            TopicPartition topicPartition = new TopicPartition(topic, 0);
+            consumer.assign(List.of(topicPartition));
+            TestUtils.waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(15));
+                return records.count() == 1;
+            }, "consumer failed to receive message");
+        }
+    }
+
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
deleted file mode 100644
index 01d18114a04..00000000000
--- 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-import kafka.api.GroupAuthorizerIntegrationTest._
-import kafka.server.BaseRequestTest
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, 
AclPermissionType}
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.errors.TopicAuthorizationException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
-import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.authorizer.StandardAuthorizer
-import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
-import org.apache.kafka.server.config.ServerConfigs
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.jdk.CollectionConverters._
-
-object GroupAuthorizerIntegrationTest {
-  val BrokerPrincipal = new KafkaPrincipal("Group", "broker")
-  val ClientPrincipal = new KafkaPrincipal("Group", "client")
-
-  val BrokerListenerName = "BROKER"
-  val ClientListenerName = "CLIENT"
-  val ControllerListenerName = "CONTROLLER"
-
-  class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) 
{
-    override def build(context: AuthenticationContext): KafkaPrincipal = {
-      context.listenerName match {
-        case BrokerListenerName | ControllerListenerName => BrokerPrincipal
-        case ClientListenerName => ClientPrincipal
-        case listenerName => throw new IllegalArgumentException(s"No principal 
mapped to listener $listenerName")
-      }
-    }
-  }
-}
-
-class GroupAuthorizerIntegrationTest extends BaseRequestTest {
-
-  val brokerId: Integer = 0
-
-  override def brokerCount: Int = 1
-  override def interBrokerListenerName: ListenerName = new 
ListenerName(BrokerListenerName)
-  override def listenerName: ListenerName = new 
ListenerName(ClientListenerName)
-
-  def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
-  def clientPrincipal: KafkaPrincipal = ClientPrincipal
-
-  override def kraftControllerConfigs(testInfo: TestInfo): 
collection.Seq[Properties] = {
-    val controllerConfigs = super.kraftControllerConfigs(testInfo)
-    controllerConfigs.foreach(addNodeProperties)
-    controllerConfigs
-  }
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString)
-    addNodeProperties(properties)
-  }
-
-  private def addNodeProperties(properties: Properties): Unit = {
-    properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
classOf[StandardAuthorizer].getName)
-    properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, 
BrokerPrincipal.toString)
-
-    properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
-    
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"1")
-    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
-    
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
-    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
-    properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
classOf[GroupPrincipalBuilder].getName)
-  }
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    doSetup(testInfo, createOffsetsTopic = false)
-
-    // Allow inter-broker communication
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, 
principal = BrokerPrincipal)),
-      new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL)
-    )
-
-    createOffsetsTopic(interBrokerListenerName)
-  }
-
-  private def createAcl(aclOperation: AclOperation,
-                        aclPermissionType: AclPermissionType,
-                        principal: KafkaPrincipal = ClientPrincipal): 
AccessControlEntry = {
-    new AccessControlEntry(principal.toString, WILDCARD_HOST, aclOperation, 
aclPermissionType)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUnauthorizedProduceAndConsume(groupProtocol: String): Unit = {
-    val topic = "topic"
-    val topicPartition = new TopicPartition("topic", 0)
-
-    createTopic(topic, listenerName = interBrokerListenerName)
-
-    val producer = createProducer()
-    val produceException = assertThrows(classOf[ExecutionException],
-      () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()).getCause
-    assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
-    assertEquals(Set(topic), 
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
-
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
-    consumer.assign(java.util.List.of(topicPartition))
-    val consumeException = assertThrows(classOf[TopicAuthorizationException],
-      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
-    assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  @Timeout(60)
-  def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): 
Unit = {
-    val topic = "topic"
-
-    createTopic(topic, listenerName = interBrokerListenerName)
-
-    // allow topic read/write permission to poll/send record
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
-    )
-    val producer = createProducer()
-    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
-    producer.close()
-
-    // allow group read permission to join group
-    val group = "group"
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
-    )
-
-    val props = new Properties()
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    val consumer = createConsumer(configOverrides = props)
-    consumer.subscribe(java.util.List.of(topic))
-    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
-
-    removeAndVerifyAcls(
-      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
-    )
-
-    assertDoesNotThrow(new Executable {
-      override def execute(): Unit = consumer.unsubscribe()
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumeCloseWithoutGroupPermission(groupProtocol: String): Unit = {
-    val topic = "topic"
-    createTopic(topic, listenerName = interBrokerListenerName)
-
-    // allow topic read/write permission to poll/send record
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
-    )
-    val producer = createProducer()
-    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
-
-    // allow group read permission to join group
-    val group = "group"
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
-    )
-
-    val props = new Properties()
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    val consumer = createConsumer(configOverrides = props)
-    consumer.subscribe(java.util.List.of(topic))
-    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
-
-    removeAndVerifyAcls(
-      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
-    )
-
-    assertDoesNotThrow(new Executable {
-      override def execute(): Unit = consumer.close()
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAuthorizedProduceAndConsume(groupProtocol: String): Unit = {
-    val topic = "topic"
-    val topicPartition = new TopicPartition("topic", 0)
-
-    createTopic(topic, listenerName = interBrokerListenerName)
-
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
-    )
-    val producer = createProducer()
-    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
-
-    addAndVerifyAcls(
-      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
-      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
-    )
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
-    consumer.assign(java.util.List.of(topicPartition))
-    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
-  }
-
-}

Reply via email to