This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cc20e784745 KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close
swallow TopicAuthorizationException and GroupAuthorizationException (#17516)
cc20e784745 is described below
commit cc20e7847450d7a3bd5af85f821697e17761cc60
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Nov 15 22:15:26 2024 +0800
KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close swallow
TopicAuthorizationException and GroupAuthorizationException (#17516)
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 12 ++--
.../consumer/internals/AsyncKafkaConsumerTest.java | 18 +++++
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 79 ++++++++++++++++++++++
3 files changed, 105 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fcf688b7f8e..da3f3f2f25b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -76,10 +76,12 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
@@ -1571,10 +1573,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscriptions.assignedPartitions());
try {
- // If users subscribe to an invalid topic name, they will get
InvalidTopicException in error events,
- // because network thread keeps trying to send MetadataRequest
in the background.
- // Ignore it to avoid unsubscribe failed.
- processBackgroundEvents(unsubscribeEvent.future(), timer, e ->
e instanceof InvalidTopicException);
+ // If users subscribe to a topic with invalid name or without
permission, they will get some exceptions.
+ // Because network thread keeps trying to send MetadataRequest
or ConsumerGroupHeartbeatRequest in the background,
+ // there will be some error events in the background queue.
+ // When running unsubscribe, these exceptions should be
ignored, or users can't unsubscribe successfully.
+ processBackgroundEvents(unsubscribeEvent.future(), timer,
+ e -> e instanceof InvalidTopicException || e instanceof
TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to
complete");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index eaf974b91c4..8eb8ec4c85b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -282,6 +283,23 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.close());
}
+ @Test
+ public void testUnsubscribeWithTopicAuthorizationException() {
+ consumer = newConsumer();
+ backgroundEventQueue.add(new ErrorEvent(new
TopicAuthorizationException(Set.of("test-topic"))));
+ completeUnsubscribeApplicationEventSuccessfully();
+ assertDoesNotThrow(() -> consumer.unsubscribe());
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
+ @Test
+ public void testCloseWithTopicAuthorizationException() {
+ consumer = newConsumer();
+ backgroundEventQueue.add(new ErrorEvent(new
TopicAuthorizationException(Set.of("test-topic"))));
+ completeUnsubscribeApplicationEventSuccessfully();
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
diff --git
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 46b34f4efda..e9a0644a26c 100644
---
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumeUnsubscribeWithoutGroupPermission(quorum: String,
groupProtocol: String): Unit = {
+ val topic = "topic"
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ // allow topic read/write permission to poll/send record
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW),
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+ )
+ val producer = createProducer()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
+ producer.close()
+
+ // allow group read permission to join group
+ val group = "group"
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ val props = new Properties()
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ val consumer = createConsumer(configOverrides = props)
+ consumer.subscribe(List(topic).asJava)
+ TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+ removeAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ // allow topic read/write permission to poll/send record
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW),
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+ )
+ val producer = createProducer()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
+
+ // allow group read permission to join group
+ val group = "group"
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ val props = new Properties()
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ val consumer = createConsumer(configOverrides = props)
+ consumer.subscribe(List(topic).asJava)
+ TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+ removeAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.close()
+ })
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String):
Unit = {