This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new f026b1cd587 KAFKA-18813: [2/N] Client support for TopicAuthException
in HB path (#18986)
f026b1cd587 is described below
commit f026b1cd58790ed71eef5e6d4cd4e8218d25a6b9
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Feb 21 08:45:20 2025 -0500
KAFKA-18813: [2/N] Client support for TopicAuthException in HB path (#18986)
Reviewers: David Jacot <[email protected]>
---
.../consumer/internals/AbstractHeartbeatRequestManager.java | 8 ++++++++
.../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 3 ++-
.../consumer/internals/ConsumerHeartbeatRequestManagerTest.java | 5 +++++
3 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index fe8926abbf8..608434e524c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -403,6 +403,14 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
handleFatalFailure(error.exception(exception.getMessage()));
break;
+ case TOPIC_AUTHORIZATION_FAILED:
+ logger.error("{} failed for member {} with state {} due to {}:
{}", heartbeatRequestName(),
+ membershipManager().memberId,
membershipManager().state, error, errorMessage);
+ // Propagate auth error received in HB so that it's returned
on poll.
+ // Member should stay in its current state so it can recover
if ever the missing ACLs are added.
+ backgroundEventHandler.add(new ErrorEvent(error.exception()));
+ break;
+
case INVALID_REQUEST:
case GROUP_MAX_SIZE_REACHED:
case UNSUPPORTED_ASSIGNOR:
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e19c37c954f..38a4bd2d0cc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -85,6 +85,7 @@ import
org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -1621,7 +1622,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
try {
// If users have fatal error, they will get some exceptions in
the background queue.
// When running unsubscribe, these exceptions should be
ignored, or users can't unsubscribe successfully.
- processBackgroundEvents(unsubscribeEvent.future(), timer, e ->
e instanceof GroupAuthorizationException);
+ processBackgroundEvents(unsubscribeEvent.future(), timer, e ->
(e instanceof GroupAuthorizationException || e instanceof
TopicAuthorizationException));
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to
complete");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 9bc5d66d9c3..7bf35f22617 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -582,6 +582,11 @@ public class ConsumerHeartbeatRequestManagerTest {
verify(backgroundEventHandler, never()).add(any());
assertNextHeartbeatTiming(0);
break;
+ case TOPIC_AUTHORIZATION_FAILED:
+ verify(backgroundEventHandler).add(any(ErrorEvent.class));
+ assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS);
+ verify(membershipManager, never()).transitionToFatal();
+ break;
default:
if (isFatal) {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());