This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05d05e1b5e3 KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid 
topics (#17244)
05d05e1b5e3 is described below

commit 05d05e1b5e382e76209b0a9536bbe5eb27ffc292
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 22:56:22 2024 +0800

    KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics (#17244)
    
    Reviewers: Kirk True <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 30 ++++++++++++++-----
 .../consumer/internals/AsyncKafkaConsumerTest.java | 35 +++++++++++++++++-----
 .../api/PlaintextConsumerSubscriptionTest.scala    | 25 ++++++++++++++--
 3 files changed, 72 insertions(+), 18 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index b88a25f907b..f3bb9589e1a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -74,6 +74,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
@@ -115,6 +116,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -1279,7 +1281,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
         applicationEventHandler.add(unsubscribeEvent);
         try {
-            processBackgroundEvents(unsubscribeEvent.future(), timer);
+            // If users subscribe to an invalid topic name, they will get 
InvalidTopicException in error events,
+            // because network thread keeps trying to send MetadataRequest in 
the background.
+            // Ignore it to avoid unsubscribe failed.
+            processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e 
instanceof InvalidTopicException);
             log.info("Completed releasing assignment and sending leave group 
to close consumer");
         } catch (TimeoutException e) {
             log.warn("Consumer triggered an unsubscribe event to leave the 
group but couldn't " +
@@ -1476,7 +1481,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     subscriptions.assignedPartitions());
 
             try {
-                processBackgroundEvents(unsubscribeEvent.future(), timer);
+                // If users subscribe to an invalid topic name, they will get 
InvalidTopicException in error events,
+                // because network thread keeps trying to send MetadataRequest 
in the background.
+                // Ignore it to avoid unsubscribe failed.
+                processBackgroundEvents(unsubscribeEvent.future(), timer, e -> 
e instanceof InvalidTopicException);
                 log.info("Unsubscribed all topics or patterns and assigned 
partitions");
             } catch (TimeoutException e) {
                 log.error("Failed while waiting for the unsubscribe event to 
complete");
@@ -1805,15 +1813,23 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
      * {@link ConsumerRebalanceListener} callback is performed.
      *
-     * @param future         Event that contains a {@link CompletableFuture}; 
it is on this future that the
-     *                       application thread will wait for completion
-     * @param timer          Overall timer that bounds how long to wait for 
the event to complete
+     * @param future                    Event that contains a {@link 
CompletableFuture}; it is on this future that the
+     *                                  application thread will wait for 
completion
+     * @param timer                     Overall timer that bounds how long to 
wait for the event to complete
+     * @param ignoreErrorEventException Predicate to ignore background errors.
+     *                                  Any exceptions found while processing 
background events that match the predicate won't be propagated.
      * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
      */
     // Visible for testing
-    <T> T processBackgroundEvents(Future<T> future, Timer timer) {
+    <T> T processBackgroundEvents(Future<T> future, Timer timer, 
Predicate<Exception> ignoreErrorEventException) {
         do {
-            boolean hadEvents = processBackgroundEvents();
+            boolean hadEvents = false;
+            try {
+                hadEvents = processBackgroundEvents();
+            } catch (Exception e) {
+                if (!ignoreErrorEventException.test(e))
+                    throw e;
+            }
 
             try {
                 if (future.isDone()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index ce7cd17c996..cec65754ed9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -59,6 +59,7 @@ import 
org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -105,6 +106,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
@@ -248,6 +250,23 @@ public class AsyncKafkaConsumerTest {
         assertEquals("This consumer has already been closed.", 
res.getMessage());
     }
 
+    @Test
+    public void testUnsubscribeWithInvalidTopicException() {
+        consumer = newConsumer();
+        backgroundEventQueue.add(new ErrorEvent(new 
InvalidTopicException("Invalid topic name")));
+        completeUnsubscribeApplicationEventSuccessfully();
+        assertDoesNotThrow(() -> consumer.unsubscribe());
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
+    @Test
+    public void testCloseWithInvalidTopicException() {
+        consumer = newConsumer();
+        backgroundEventQueue.add(new ErrorEvent(new 
InvalidTopicException("Invalid topic name")));
+        completeUnsubscribeApplicationEventSuccessfully();
+        assertDoesNotThrow(() -> consumer.close());
+    }
+
     @Test
     public void testCommitAsyncWithNullCallback() {
         consumer = newConsumer();
@@ -845,7 +864,7 @@ public class AsyncKafkaConsumerTest {
             subscriptions,
             "group-id",
             "client-id"));
-        doThrow(new 
KafkaException()).when(consumer).processBackgroundEvents(any(), any());
+        doThrow(new 
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
         assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
         verifyUnsubscribeEvent(subscriptions);
         // Close operation should carry on even if the unsubscribe fails
@@ -1750,7 +1769,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} takes a bit of time to 
complete, but does within the timeout.
      */
     @Test
@@ -1776,14 +1795,14 @@ public class AsyncKafkaConsumerTest {
             return null;
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        consumer.processBackgroundEvents(future, timer);
+        consumer.processBackgroundEvents(future, timer, e -> false);
 
         // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the 
two incremental timeouts/retries.
         assertEquals(800, timer.remainingMs());
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} is already complete when 
invoked, so it doesn't have to wait.
      */
     @Test
@@ -1794,7 +1813,7 @@ public class AsyncKafkaConsumerTest {
         // Create a future that is already completed.
         CompletableFuture<?> future = CompletableFuture.completedFuture(null);
 
-        consumer.processBackgroundEvents(future, timer);
+        consumer.processBackgroundEvents(future, timer, e -> false);
 
         // Because we didn't need to perform a timed get, we should still have 
every last millisecond
         // of our initial timeout.
@@ -1802,7 +1821,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer) 
processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} does not complete within the 
timeout.
      */
     @Test
@@ -1817,7 +1836,7 @@ public class AsyncKafkaConsumerTest {
             throw new java.util.concurrent.TimeoutException("Intentional 
timeout");
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer));
+        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer, e -> false));
 
         // Because we forced our mocked future to continuously time out, we 
should have no time remaining.
         assertEquals(0, timer.remainingMs());
@@ -1889,7 +1908,7 @@ public class AsyncKafkaConsumerTest {
         // Check that an unsubscribe event was generated, and that the 
consumer waited for it to
         // complete processing background events.
         verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
-        verify(consumer).processBackgroundEvents(any(), any());
+        verify(consumer).processBackgroundEvents(any(), any(), any());
 
         // The consumer should not clear the assignment in the app thread. The 
unsubscribe
         // event is the one responsible for updating the assignment in the 
background when it
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 997ef6c8b5d..de1d3bccca3 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
@@ -227,11 +228,29 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
-    // Invalid topic name due to space
-    val invalidTopicName = "topic abc"
+  def testSubscribeInvalidTopicCanUnsubscribe(quorum: String, groupProtocol: 
String): Unit = {
+    val consumer = createConsumer()
+
+    setupSubscribeInvalidTopic(consumer)
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopicCanClose(quorum: String, groupProtocol: 
String): Unit = {
     val consumer = createConsumer()
 
+    setupSubscribeInvalidTopic(consumer)
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.close()
+    })
+  }
+
+  def setupSubscribeInvalidTopic(consumer: Consumer[Array[Byte], 
Array[Byte]]): Unit = {
+    // Invalid topic name due to space
+    val invalidTopicName = "topic abc"
     consumer.subscribe(List(invalidTopicName).asJava)
 
     var exception : InvalidTopicException = null

Reply via email to