This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a24fedfba00 KAFKA-18817:[1/N] ShareGroupHeartbeat and 
ShareGroupDescribe API must check topic describe (#19055)
a24fedfba00 is described below

commit a24fedfba00d249bbacb1f1e139771ed196614e8
Author: DL1231 <[email protected]>
AuthorDate: Mon Mar 3 17:49:37 2025 +0800

    KAFKA-18817:[1/N] ShareGroupHeartbeat and ShareGroupDescribe API must check 
topic describe (#19055)
    
    1、Client support for TopicAuthException in DescribeShareGroup and HB
    path
    2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow
    TopicAuthorizationException and GroupAuthorizationException
    
    Reviewers: ShivsundarR <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../internals/DescribeShareGroupsHandler.java      |  2 ++
 .../consumer/internals/ShareConsumerImpl.java      | 27 +++++++++++++----
 .../requests/ShareGroupDescribeResponse.java       |  1 +
 .../requests/ShareGroupHeartbeatResponse.java      |  1 +
 .../common/message/ShareGroupDescribeResponse.json |  1 +
 .../message/ShareGroupHeartbeatResponse.json       |  3 +-
 .../consumer/internals/ShareConsumerImplTest.java  | 35 ++++++++++++++++++----
 7 files changed, 57 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
index 1c79225b083..1aa37bbf776 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -159,7 +159,9 @@ public class DescribeShareGroupsHandler extends 
AdminApiHandler.Batched<Coordina
             Set<CoordinatorKey> groupsToUnmap) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
+            case TOPIC_AUTHORIZATION_FAILED:
                 log.debug("`DescribeShareGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
+                // The topic auth response received on DescribeShareGroup is a 
generic one not including topic names, so we just pass it on unchanged here.
                 failed.put(groupId, error.exception(errorMsg));
                 break;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index e808ecf5896..4dc8cf4c697 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -54,9 +54,11 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -92,6 +94,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
@@ -914,7 +917,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         ShareUnsubscribeEvent unsubscribeEvent = new 
ShareUnsubscribeEvent(calculateDeadlineMs(timer));
         applicationEventHandler.add(unsubscribeEvent);
         try {
-            processBackgroundEvents(unsubscribeEvent.future(), timer);
+            // If users have fatal error, they will get some exceptions in the 
background queue.
+            // When running unsubscribe, these exceptions should be ignored, 
or users can't unsubscribe successfully.
+            processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e 
instanceof GroupAuthorizationException
+                || e instanceof TopicAuthorizationException));
             log.info("Completed releasing assignment and leaving group to 
close consumer.");
         } catch (TimeoutException e) {
             log.warn("Consumer triggered an unsubscribe event to leave the 
group but couldn't " +
@@ -1107,18 +1113,27 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * Each iteration gives the application thread an opportunity to process 
background events, which may be
      * necessary to complete the overall processing.
      *
-     * @param future         Event that contains a {@link CompletableFuture}; 
it is on this future that the
-     *                       application thread will wait for completion
-     * @param timer          Overall timer that bounds how long to wait for 
the event to complete
+     * @param future                    Event that contains a {@link 
CompletableFuture}; it is on this future that the
+     *                                  application thread will wait for 
completion
+     * @param timer                     Overall timer that bounds how long to 
wait for the event to complete
+     * @param ignoreErrorEventException Predicate to ignore background errors.
+     *                                  Any exceptions found while processing 
background events that match the predicate won't be propagated.
      * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
      */
     // Visible for testing
     <T> T processBackgroundEvents(final Future<T> future,
-                                  final Timer timer) {
+                                  final Timer timer,
+                                  final Predicate<Exception> 
ignoreErrorEventException) {
         log.trace("Will wait up to {} ms for future {} to complete", 
timer.remainingMs(), future);
 
         do {
-            boolean hadEvents = processBackgroundEvents();
+            boolean hadEvents = false;
+            try {
+                hadEvents = processBackgroundEvents();
+            } catch (Exception e) {
+                if (!ignoreErrorEventException.test(e))
+                    throw e;
+            }
 
             try {
                 if (future.isDone()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
index 95dd371eedf..fc25658d703 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
  * - {@link Errors#INVALID_REQUEST}
  * - {@link Errors#INVALID_GROUP_ID}
  * - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
  */
 public class ShareGroupDescribeResponse extends AbstractResponse {
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
index de05d44aebe..08ce09e1f4b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
  * - {@link Errors#INVALID_REQUEST}
  * - {@link Errors#UNKNOWN_MEMBER_ID}
  * - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
  */
 public class ShareGroupHeartbeatResponse extends AbstractResponse {
     private final ShareGroupHeartbeatResponseData data;
diff --git 
a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
index c093b788bfc..e90e431f64e 100644
--- a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
+++ b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
@@ -27,6 +27,7 @@
   // - INVALID_REQUEST (version 0+)
   // - INVALID_GROUP_ID (version 0+)
   // - GROUP_ID_NOT_FOUND (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
index e0ff5a93d54..75aa62b76f4 100644
--- a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
@@ -21,12 +21,13 @@
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
-  // - NOT_COORDINATOR (version 0+) 
+  // - NOT_COORDINATOR (version 0+)
   // - COORDINATOR_NOT_AVAILABLE (version 0+)
   // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
   // - INVALID_REQUEST (version 0+)
   // - UNKNOWN_MEMBER_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6f5e78451b1..688547a3975 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -53,12 +54,14 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
@@ -271,6 +274,26 @@ public class ShareConsumerImplTest {
         assertEquals("This consumer has already been closed.", 
res.getMessage());
     }
 
+    @Test
+    public void testUnsubscribeWithTopicAuthorizationException() {
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(subscriptions);
+
+        backgroundEventQueue.add(new ErrorEvent(new 
TopicAuthorizationException(Set.of("test-topic"))));
+        completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
+        assertDoesNotThrow(() -> consumer.unsubscribe());
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
+    @Test
+    public void testCloseWithTopicAuthorizationException() {
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(subscriptions);
+
+        completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
     @Test
     public void testVerifyApplicationEventOnShutdown() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
@@ -502,7 +525,7 @@ public class ShareConsumerImplTest {
     }
 
     /**
-     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} takes a bit of time to 
complete, but does within the timeout.
      */
     @Test
@@ -529,14 +552,14 @@ public class ShareConsumerImplTest {
             return null;
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        consumer.processBackgroundEvents(future, timer);
+        consumer.processBackgroundEvents(future, timer, e -> false);
 
         // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the 
two incremental timeouts/retries.
         assertEquals(800, timer.remainingMs());
     }
 
     /**
-     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} is already complete when 
invoked, so it doesn't have to wait.
      */
     @Test
@@ -548,7 +571,7 @@ public class ShareConsumerImplTest {
         // Create a future that is already completed.
         CompletableFuture<?> future = CompletableFuture.completedFuture(null);
 
-        consumer.processBackgroundEvents(future, timer);
+        consumer.processBackgroundEvents(future, timer, e -> false);
 
         // Because we didn't need to perform a timed get, we should still have 
every last millisecond
         // of our initial timeout.
@@ -556,7 +579,7 @@ public class ShareConsumerImplTest {
     }
 
     /**
-     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} does not complete within the 
timeout.
      */
     @Test
@@ -572,7 +595,7 @@ public class ShareConsumerImplTest {
             throw new java.util.concurrent.TimeoutException("Intentional 
timeout");
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer));
+        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer, e -> false));
 
         // Because we forced our mocked future to continuously time out, we 
should have no time remaining.
         assertEquals(0, timer.remainingMs());

Reply via email to