This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05d05e1b5e3 KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid
topics (#17244)
05d05e1b5e3 is described below
commit 05d05e1b5e382e76209b0a9536bbe5eb27ffc292
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 22:56:22 2024 +0800
KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics (#17244)
Reviewers: Kirk True <[email protected]>, Lianet Magrans
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 30 ++++++++++++++-----
.../consumer/internals/AsyncKafkaConsumerTest.java | 35 +++++++++++++++++-----
.../api/PlaintextConsumerSubscriptionTest.scala | 25 ++++++++++++++--
3 files changed, 72 insertions(+), 18 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index b88a25f907b..f3bb9589e1a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -74,6 +74,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
@@ -115,6 +116,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -1279,7 +1281,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
UnsubscribeEvent unsubscribeEvent = new
UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
- processBackgroundEvents(unsubscribeEvent.future(), timer);
+ // If users subscribe to an invalid topic name, they will get
InvalidTopicException in error events,
+ // because network thread keeps trying to send MetadataRequest in
the background.
+ // Ignore it to avoid unsubscribe failed.
+ processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e
instanceof InvalidTopicException);
log.info("Completed releasing assignment and sending leave group
to close consumer");
} catch (TimeoutException e) {
log.warn("Consumer triggered an unsubscribe event to leave the
group but couldn't " +
@@ -1476,7 +1481,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscriptions.assignedPartitions());
try {
- processBackgroundEvents(unsubscribeEvent.future(), timer);
+ // If users subscribe to an invalid topic name, they will get
InvalidTopicException in error events,
+ // because network thread keeps trying to send MetadataRequest
in the background.
+ // Ignore it to avoid unsubscribe failed.
+ processBackgroundEvents(unsubscribeEvent.future(), timer, e ->
e instanceof InvalidTopicException);
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to
complete");
@@ -1805,15 +1813,23 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* execution of the rebalancing logic. The rebalancing logic cannot
complete until the
* {@link ConsumerRebalanceListener} callback is performed.
*
- * @param future Event that contains a {@link CompletableFuture};
it is on this future that the
- * application thread will wait for completion
- * @param timer Overall timer that bounds how long to wait for
the event to complete
+ * @param future Event that contains a {@link
CompletableFuture}; it is on this future that the
+ * application thread will wait for
completion
+ * @param timer Overall timer that bounds how long to
wait for the event to complete
+ * @param ignoreErrorEventException Predicate to ignore background errors.
+ * Any exceptions found while processing
background events that match the predicate won't be propagated.
* @return {@code true} if the event completed within the timeout, {@code
false} otherwise
*/
// Visible for testing
- <T> T processBackgroundEvents(Future<T> future, Timer timer) {
+ <T> T processBackgroundEvents(Future<T> future, Timer timer,
Predicate<Exception> ignoreErrorEventException) {
do {
- boolean hadEvents = processBackgroundEvents();
+ boolean hadEvents = false;
+ try {
+ hadEvents = processBackgroundEvents();
+ } catch (Exception e) {
+ if (!ignoreErrorEventException.test(e))
+ throw e;
+ }
try {
if (future.isDone()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index ce7cd17c996..cec65754ed9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -59,6 +59,7 @@ import
org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
@@ -105,6 +106,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -248,6 +250,23 @@ public class AsyncKafkaConsumerTest {
assertEquals("This consumer has already been closed.",
res.getMessage());
}
+ @Test
+ public void testUnsubscribeWithInvalidTopicException() {
+ consumer = newConsumer();
+ backgroundEventQueue.add(new ErrorEvent(new
InvalidTopicException("Invalid topic name")));
+ completeUnsubscribeApplicationEventSuccessfully();
+ assertDoesNotThrow(() -> consumer.unsubscribe());
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
+ @Test
+ public void testCloseWithInvalidTopicException() {
+ consumer = newConsumer();
+ backgroundEventQueue.add(new ErrorEvent(new
InvalidTopicException("Invalid topic name")));
+ completeUnsubscribeApplicationEventSuccessfully();
+ assertDoesNotThrow(() -> consumer.close());
+ }
+
@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
@@ -845,7 +864,7 @@ public class AsyncKafkaConsumerTest {
subscriptions,
"group-id",
"client-id"));
- doThrow(new
KafkaException()).when(consumer).processBackgroundEvents(any(), any());
+ doThrow(new
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
verifyUnsubscribeEvent(subscriptions);
// Close operation should carry on even if the unsubscribe fails
@@ -1750,7 +1769,7 @@ public class AsyncKafkaConsumerTest {
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to
complete, but does within the timeout.
*/
@Test
@@ -1776,14 +1795,14 @@ public class AsyncKafkaConsumerTest {
return null;
}).when(future).get(any(Long.class), any(TimeUnit.class));
- consumer.processBackgroundEvents(future, timer);
+ consumer.processBackgroundEvents(future, timer, e -> false);
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the
two incremental timeouts/retries.
assertEquals(800, timer.remainingMs());
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} is already complete when
invoked, so it doesn't have to wait.
*/
@Test
@@ -1794,7 +1813,7 @@ public class AsyncKafkaConsumerTest {
// Create a future that is already completed.
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
- consumer.processBackgroundEvents(future, timer);
+ consumer.processBackgroundEvents(future, timer, e -> false);
// Because we didn't need to perform a timed get, we should still have
every last millisecond
// of our initial timeout.
@@ -1802,7 +1821,7 @@ public class AsyncKafkaConsumerTest {
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer)
processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the
timeout.
*/
@Test
@@ -1817,7 +1836,7 @@ public class AsyncKafkaConsumerTest {
throw new java.util.concurrent.TimeoutException("Intentional
timeout");
}).when(future).get(any(Long.class), any(TimeUnit.class));
- assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer));
+ assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer, e -> false));
// Because we forced our mocked future to continuously time out, we
should have no time remaining.
assertEquals(0, timer.remainingMs());
@@ -1889,7 +1908,7 @@ public class AsyncKafkaConsumerTest {
// Check that an unsubscribe event was generated, and that the
consumer waited for it to
// complete processing background events.
verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
- verify(consumer).processBackgroundEvents(any(), any());
+ verify(consumer).processBackgroundEvents(any(), any(), any());
// The consumer should not clear the assignment in the app thread. The
unsubscribe
// event is the one responsible for updating the assignment in the
background when it
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 997ef6c8b5d..de1d3bccca3 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidTopicException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@@ -227,11 +228,29 @@ class PlaintextConsumerSubscriptionTest extends
AbstractConsumerTest {
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
- def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit =
{
- // Invalid topic name due to space
- val invalidTopicName = "topic abc"
+ def testSubscribeInvalidTopicCanUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val consumer = createConsumer()
+
+ setupSubscribeInvalidTopic(consumer)
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testSubscribeInvalidTopicCanClose(quorum: String, groupProtocol:
String): Unit = {
val consumer = createConsumer()
+ setupSubscribeInvalidTopic(consumer)
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.close()
+ })
+ }
+
+ def setupSubscribeInvalidTopic(consumer: Consumer[Array[Byte],
Array[Byte]]): Unit = {
+ // Invalid topic name due to space
+ val invalidTopicName = "topic abc"
consumer.subscribe(List(invalidTopicName).asJava)
var exception : InvalidTopicException = null