This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a24fedfba00 KAFKA-18817:[1/N] ShareGroupHeartbeat and
ShareGroupDescribe API must check topic describe (#19055)
a24fedfba00 is described below
commit a24fedfba00d249bbacb1f1e139771ed196614e8
Author: DL1231 <[email protected]>
AuthorDate: Mon Mar 3 17:49:37 2025 +0800
KAFKA-18817:[1/N] ShareGroupHeartbeat and ShareGroupDescribe API must check
topic describe (#19055)
1、Client support for TopicAuthException in DescribeShareGroup and HB
path
2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow
TopicAuthorizationException and GroupAuthorizationException
Reviewers: ShivsundarR <[email protected]>, Andrew Schofield
<[email protected]>
---
.../internals/DescribeShareGroupsHandler.java | 2 ++
.../consumer/internals/ShareConsumerImpl.java | 27 +++++++++++++----
.../requests/ShareGroupDescribeResponse.java | 1 +
.../requests/ShareGroupHeartbeatResponse.java | 1 +
.../common/message/ShareGroupDescribeResponse.json | 1 +
.../message/ShareGroupHeartbeatResponse.json | 3 +-
.../consumer/internals/ShareConsumerImplTest.java | 35 ++++++++++++++++++----
7 files changed, 57 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
index 1c79225b083..1aa37bbf776 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -159,7 +159,9 @@ public class DescribeShareGroupsHandler extends
AdminApiHandler.Batched<Coordina
Set<CoordinatorKey> groupsToUnmap) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
+ case TOPIC_AUTHORIZATION_FAILED:
log.debug("`DescribeShareGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
+ // The topic auth response received on DescribeShareGroup is a
generic one not including topic names, so we just pass it on unchanged here.
failed.put(groupId, error.exception(errorMsg));
break;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index e808ecf5896..4dc8cf4c697 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -54,9 +54,11 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
@@ -92,6 +94,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
@@ -914,7 +917,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ShareUnsubscribeEvent unsubscribeEvent = new
ShareUnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
- processBackgroundEvents(unsubscribeEvent.future(), timer);
+ // If users have fatal error, they will get some exceptions in the
background queue.
+ // When running unsubscribe, these exceptions should be ignored,
or users can't unsubscribe successfully.
+ processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e
instanceof GroupAuthorizationException
+ || e instanceof TopicAuthorizationException));
log.info("Completed releasing assignment and leaving group to
close consumer.");
} catch (TimeoutException e) {
log.warn("Consumer triggered an unsubscribe event to leave the
group but couldn't " +
@@ -1107,18 +1113,27 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* Each iteration gives the application thread an opportunity to process
background events, which may be
* necessary to complete the overall processing.
*
- * @param future Event that contains a {@link CompletableFuture};
it is on this future that the
- * application thread will wait for completion
- * @param timer Overall timer that bounds how long to wait for
the event to complete
+ * @param future Event that contains a {@link
CompletableFuture}; it is on this future that the
+ * application thread will wait for
completion
+ * @param timer Overall timer that bounds how long to
wait for the event to complete
+ * @param ignoreErrorEventException Predicate to ignore background errors.
+ * Any exceptions found while processing
background events that match the predicate won't be propagated.
* @return {@code true} if the event completed within the timeout, {@code
false} otherwise
*/
// Visible for testing
<T> T processBackgroundEvents(final Future<T> future,
- final Timer timer) {
+ final Timer timer,
+ final Predicate<Exception>
ignoreErrorEventException) {
log.trace("Will wait up to {} ms for future {} to complete",
timer.remainingMs(), future);
do {
- boolean hadEvents = processBackgroundEvents();
+ boolean hadEvents = false;
+ try {
+ hadEvents = processBackgroundEvents();
+ } catch (Exception e) {
+ if (!ignoreErrorEventException.test(e))
+ throw e;
+ }
try {
if (future.isDone()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
index 95dd371eedf..fc25658d703 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
*/
public class ShareGroupDescribeResponse extends AbstractResponse {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
index de05d44aebe..08ce09e1f4b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
*/
public class ShareGroupHeartbeatResponse extends AbstractResponse {
private final ShareGroupHeartbeatResponseData data;
diff --git
a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
index c093b788bfc..e90e431f64e 100644
--- a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
+++ b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
@@ -27,6 +27,7 @@
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
index e0ff5a93d54..75aa62b76f4 100644
--- a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
@@ -21,12 +21,13 @@
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
- // - NOT_COORDINATOR (version 0+)
+ // - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_MEMBER_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6f5e78451b1..688547a3975 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -53,12 +54,14 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@@ -271,6 +274,26 @@ public class ShareConsumerImplTest {
assertEquals("This consumer has already been closed.",
res.getMessage());
}
+ @Test
+ public void testUnsubscribeWithTopicAuthorizationException() {
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(subscriptions);
+
+ backgroundEventQueue.add(new ErrorEvent(new
TopicAuthorizationException(Set.of("test-topic"))));
+ completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
+ assertDoesNotThrow(() -> consumer.unsubscribe());
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
+ @Test
+ public void testCloseWithTopicAuthorizationException() {
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(subscriptions);
+
+ completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
@Test
public void testVerifyApplicationEventOnShutdown() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
@@ -502,7 +525,7 @@ public class ShareConsumerImplTest {
}
/**
- * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to
complete, but does within the timeout.
*/
@Test
@@ -529,14 +552,14 @@ public class ShareConsumerImplTest {
return null;
}).when(future).get(any(Long.class), any(TimeUnit.class));
- consumer.processBackgroundEvents(future, timer);
+ consumer.processBackgroundEvents(future, timer, e -> false);
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the
two incremental timeouts/retries.
assertEquals(800, timer.remainingMs());
}
/**
- * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} is already complete when
invoked, so it doesn't have to wait.
*/
@Test
@@ -548,7 +571,7 @@ public class ShareConsumerImplTest {
// Create a future that is already completed.
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
- consumer.processBackgroundEvents(future, timer);
+ consumer.processBackgroundEvents(future, timer, e -> false);
// Because we didn't need to perform a timed get, we should still have
every last millisecond
// of our initial timeout.
@@ -556,7 +579,7 @@ public class ShareConsumerImplTest {
}
/**
- * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the
timeout.
*/
@Test
@@ -572,7 +595,7 @@ public class ShareConsumerImplTest {
throw new java.util.concurrent.TimeoutException("Intentional
timeout");
}).when(future).get(any(Long.class), any(TimeUnit.class));
- assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer));
+ assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer, e -> false));
// Because we forced our mocked future to continuously time out, we
should have no time remaining.
assertEquals(0, timer.remainingMs());