This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new f026b1cd587 KAFKA-18813: [2/N] Client support for TopicAuthException 
in HB path (#18986)
f026b1cd587 is described below

commit f026b1cd58790ed71eef5e6d4cd4e8218d25a6b9
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Feb 21 08:45:20 2025 -0500

    KAFKA-18813: [2/N] Client support for TopicAuthException in HB path (#18986)
    
    Reviewers: David Jacot <[email protected]>
---
 .../consumer/internals/AbstractHeartbeatRequestManager.java       | 8 ++++++++
 .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java      | 3 ++-
 .../consumer/internals/ConsumerHeartbeatRequestManagerTest.java   | 5 +++++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index fe8926abbf8..608434e524c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -403,6 +403,14 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
                 handleFatalFailure(error.exception(exception.getMessage()));
                 break;
 
+            case TOPIC_AUTHORIZATION_FAILED:
+                logger.error("{} failed for member {} with state {} due to {}: 
{}", heartbeatRequestName(),
+                        membershipManager().memberId, 
membershipManager().state, error, errorMessage);
+                // Propagate auth error received in HB so that it's returned 
on poll.
+                // Member should stay in its current state so it can recover 
if ever the missing ACLs are added.
+                backgroundEventHandler.add(new ErrorEvent(error.exception()));
+                break;
+
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e19c37c954f..38a4bd2d0cc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -85,6 +85,7 @@ import 
org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -1621,7 +1622,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             try {
                 // If users have fatal error, they will get some exceptions in 
the background queue.
                 // When running unsubscribe, these exceptions should be 
ignored, or users can't unsubscribe successfully.
-                processBackgroundEvents(unsubscribeEvent.future(), timer, e -> 
e instanceof GroupAuthorizationException);
+                processBackgroundEvents(unsubscribeEvent.future(), timer, e -> 
(e instanceof GroupAuthorizationException || e instanceof 
TopicAuthorizationException));
                 log.info("Unsubscribed all topics or patterns and assigned 
partitions");
             } catch (TimeoutException e) {
                 log.error("Failed while waiting for the unsubscribe event to 
complete");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 9bc5d66d9c3..7bf35f22617 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -582,6 +582,11 @@ public class ConsumerHeartbeatRequestManagerTest {
                 verify(backgroundEventHandler, never()).add(any());
                 assertNextHeartbeatTiming(0);
                 break;
+            case TOPIC_AUTHORIZATION_FAILED:
+                verify(backgroundEventHandler).add(any(ErrorEvent.class));
+                assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS);
+                verify(membershipManager, never()).transitionToFatal();
+                break;
             default:
                 if (isFatal) {
                     
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Reply via email to