This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc20e784745 KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close 
swallow TopicAuthorizationException and GroupAuthorizationException (#17516)
cc20e784745 is described below

commit cc20e7847450d7a3bd5af85f821697e17761cc60
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Nov 15 22:15:26 2024 +0800

    KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close swallow 
TopicAuthorizationException and GroupAuthorizationException (#17516)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 12 ++--
 .../consumer/internals/AsyncKafkaConsumerTest.java | 18 +++++
 .../kafka/api/GroupAuthorizerIntegrationTest.scala | 79 ++++++++++++++++++++++
 3 files changed, 105 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fcf688b7f8e..da3f3f2f25b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -76,10 +76,12 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -1571,10 +1573,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     subscriptions.assignedPartitions());
 
             try {
-                // If users subscribe to an invalid topic name, they will get 
InvalidTopicException in error events,
-                // because network thread keeps trying to send MetadataRequest 
in the background.
-                // Ignore it to avoid unsubscribe failed.
-                processBackgroundEvents(unsubscribeEvent.future(), timer, e -> 
e instanceof InvalidTopicException);
+                // If users subscribe to a topic with invalid name or without 
permission, they will get some exceptions.
+                // Because network thread keeps trying to send MetadataRequest 
or ConsumerGroupHeartbeatRequest in the background,
+                // there will be some error events in the background queue.
+                // When running unsubscribe, these exceptions should be 
ignored, or users can't unsubscribe successfully.
+                processBackgroundEvents(unsubscribeEvent.future(), timer,
+                    e -> e instanceof InvalidTopicException || e instanceof 
TopicAuthorizationException || e instanceof GroupAuthorizationException);
                 log.info("Unsubscribed all topics or patterns and assigned 
partitions");
             } catch (TimeoutException e) {
                 log.error("Failed while waiting for the unsubscribe event to 
complete");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index eaf974b91c4..8eb8ec4c85b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -282,6 +283,23 @@ public class AsyncKafkaConsumerTest {
         assertDoesNotThrow(() -> consumer.close());
     }
 
+    @Test
+    public void testUnsubscribeWithTopicAuthorizationException() {
+        consumer = newConsumer();
+        backgroundEventQueue.add(new ErrorEvent(new 
TopicAuthorizationException(Set.of("test-topic"))));
+        completeUnsubscribeApplicationEventSuccessfully();
+        assertDoesNotThrow(() -> consumer.unsubscribe());
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
+    @Test
+    public void testCloseWithTopicAuthorizationException() {
+        consumer = newConsumer();
+        backgroundEventQueue.add(new ErrorEvent(new 
TopicAuthorizationException(Set.of("test-topic"))));
+        completeUnsubscribeApplicationEventSuccessfully();
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
     @Test
     public void testCommitAsyncWithNullCallback() {
         consumer = newConsumer();
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 46b34f4efda..e9a0644a26c 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
 import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, 
groupProtocol: String): Unit = {
+    val topic = "topic"
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    // allow topic read/write permission to poll/send record
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
+    val producer = createProducer()
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
+    producer.close()
+
+    // allow group read permission to join group
+    val group = "group"
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    val props = new Properties()
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.subscribe(List(topic).asJava)
+    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+    removeAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: 
String): Unit = {
+    val topic = "topic"
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    // allow topic read/write permission to poll/send record
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
+    val producer = createProducer()
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
+
+    // allow group read permission to join group
+    val group = "group"
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    val props = new Properties()
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.subscribe(List(topic).asJava)
+    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+    removeAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.close()
+    })
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String): 
Unit = {

Reply via email to