This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0815d705926 KAFKA-18160 Interrupting or waking up onPartitionsAssigned 
in AsyncConsumer can cause the ConsumerRebalanceListenerCallbackCompletedEvent 
to be skipped (#18089)
0815d705926 is described below

commit 0815d705926a534e15ceb1d42b9d41aed2636b45
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Dec 15 10:47:30 2024 +0800

    KAFKA-18160 Interrupting or waking up onPartitionsAssigned in AsyncConsumer 
can cause the ConsumerRebalanceListenerCallbackCompletedEvent to be skipped 
(#18089)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  10 +-
 .../consumer/internals/AsyncKafkaConsumer.java     |  31 ++--
 .../consumer/internals/SubscriptionState.java      |   4 +-
 .../internals/ConsumerMembershipManagerTest.java   |  31 +++-
 .../AsyncKafkaConsumerIntegrationTest.java         |  73 --------
 .../clients/consumer/ConsumerIntegrationTest.java  | 188 +++++++++++++++++++++
 6 files changed, 240 insertions(+), 97 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c6aa70d805e..90c2b3a647d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -1175,10 +1175,16 @@ public abstract class AbstractMembershipManager<R 
extends AbstractResponse> impl
 
         // Invoke user call back.
         CompletableFuture<Void> result = 
signalPartitionsAssigned(addedPartitions);
+        // Enable newly added partitions to start fetching and updating 
positions for them.
         result.whenComplete((__, exception) -> {
             if (exception == null) {
-                // Enable newly added partitions to start fetching and 
updating positions for them.
-                
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+                // Enable assigned partitions to start fetching and updating 
positions for them.
+                // We use assignedPartitions here instead of addedPartitions 
because there's a chance that the callback
+                // might throw an exception, leaving addedPartitions empty. 
This would result in the poll operation
+                // returning no records, as no topic partitions are marked as 
fetchable. In contrast, with the classic consumer,
+                // if the first callback fails but the next one succeeds, 
polling can still retrieve data. To align with
+                // this behavior, we rely on assignedPartitions to avoid such 
scenarios.
+                
subscriptions.enablePartitionsAwaitingCallback(toTopicPartitionSet(assignedPartitions));
             } else {
                 // Keeping newly added partitions as non-fetchable after the 
callback failure.
                 // They will be retried on the next reconciliation loop, until 
it succeeds or the
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index ff679e5542d..f5e12407be5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -84,6 +84,7 @@ import 
org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -2072,23 +2073,27 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                                                                                
     ConsumerRebalanceListenerMethodName methodName,
                                                                                
     SortedSet<TopicPartition> partitions,
                                                                                
     CompletableFuture<Void> future) {
-        final Exception e;
+        Exception e;
 
-        switch (methodName) {
-            case ON_PARTITIONS_REVOKED:
-                e = 
rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
-                break;
+        try {
+            switch (methodName) {
+                case ON_PARTITIONS_REVOKED:
+                    e = 
rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
+                    break;
 
-            case ON_PARTITIONS_ASSIGNED:
-                e = 
rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
-                break;
+                case ON_PARTITIONS_ASSIGNED:
+                    e = 
rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
+                    break;
 
-            case ON_PARTITIONS_LOST:
-                e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
-                break;
+                case ON_PARTITIONS_LOST:
+                    e = 
rebalanceListenerInvoker.invokePartitionsLost(partitions);
+                    break;
 
-            default:
-                throw new IllegalArgumentException("The method " + 
methodName.fullyQualifiedMethodName() + " to invoke was not expected");
+                default:
+                    throw new IllegalArgumentException("The method " + 
methodName.fullyQualifiedMethodName() + " to invoke was not expected");
+            }
+        } catch (WakeupException | InterruptException ex) {
+            e = ex;
         }
 
         final Optional<KafkaException> error;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index f700b8706ca..bd45e71c884 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -898,8 +898,8 @@ public class SubscriptionState {
     }
 
     /**
-     * Enable fetching and updating positions for the given partitions that 
were added to the
-     * assignment, but waiting for the onPartitionsAssigned callback to 
complete. This is
+     * Enable fetching and updating positions for the given partitions that 
were assigned to the
+     * consumer, but waiting for the onPartitionsAssigned callback to 
complete. This is
      * expected to be used by the async consumer.
      */
     public synchronized void 
enablePartitionsAwaitingCallback(Collection<TopicPartition> partitions) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 9517e04e054..d42d81d7ce4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
 import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
@@ -96,6 +98,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class ConsumerMembershipManagerTest {
 
     private static final String GROUP_ID = "test-group";
@@ -1738,6 +1741,12 @@ public class ConsumerMembershipManagerTest {
 
     @Test
     public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+        testErrorsOnPartitionsRevoked(new WakeupException());
+        testErrorsOnPartitionsRevoked(new InterruptException("Intentional 
onPartitionsRevoked() error"));
+        testErrorsOnPartitionsRevoked(new 
IllegalArgumentException("Intentional onPartitionsRevoked() error"));
+    }
+
+    private void testErrorsOnPartitionsRevoked(RuntimeException error) {
         // Step 1: set up mocks
         String topicName = "topic1";
         Uuid topicId = Uuid.randomUuid();
@@ -1745,7 +1754,7 @@ public class ConsumerMembershipManagerTest {
         ConsumerMembershipManager membershipManager = 
createMemberInStableState();
         mockOwnedPartition(membershipManager, topicId, topicName);
         CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
-                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsRevoked() error")),
+                Optional.ofNullable(error),
                 Optional.empty(),
                 Optional.empty()
         );
@@ -1792,6 +1801,12 @@ public class ConsumerMembershipManagerTest {
 
     @Test
     public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+        testErrorsOnPartitionsAssigned(new WakeupException());
+        testErrorsOnPartitionsAssigned(new InterruptException("Intentional 
error"));
+        testErrorsOnPartitionsAssigned(new 
IllegalArgumentException("Intentional error"));
+    }
+
+    private void testErrorsOnPartitionsAssigned(RuntimeException error) {
         // Step 1: set up mocks
         ConsumerMembershipManager membershipManager = 
createMemberInStableState();
         String topicName = "topic1";
@@ -1799,7 +1814,7 @@ public class ConsumerMembershipManagerTest {
         mockOwnedPartition(membershipManager, topicId, topicName);
         CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
                 Optional.empty(),
-                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsAssigned() error")),
+                Optional.ofNullable(error),
                 Optional.empty()
         );
         ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
@@ -1879,7 +1894,7 @@ public class ConsumerMembershipManagerTest {
             true
         );
 
-        
verify(subscriptionState).enablePartitionsAwaitingCallback(addedPartitions);
+        
verify(subscriptionState).enablePartitionsAwaitingCallback(assignedPartitions);
     }
 
     @Test
@@ -1915,12 +1930,14 @@ public class ConsumerMembershipManagerTest {
 
     @Test
     public void testOnPartitionsLostNoError() {
-        testOnPartitionsLost(Optional.empty());
+        testOnPartitionsLost(null);
     }
 
     @Test
     public void testOnPartitionsLostError() {
-        testOnPartitionsLost(Optional.of(new KafkaException("Intentional error 
for test")));
+        testOnPartitionsLost(new KafkaException("Intentional error for test"));
+        testOnPartitionsLost(new WakeupException());
+        testOnPartitionsLost(new InterruptException("Intentional error for 
test"));
     }
 
     private void 
assertLeaveGroupDueToExpiredPollAndTransitionToStale(ConsumerMembershipManager 
membershipManager) {
@@ -2054,7 +2071,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Arrays.asList(partitionOwned, 
partitionAdded), membershipManager);
     }
 
-    private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+    private void testOnPartitionsLost(RuntimeException lostError) {
         // Step 1: set up mocks
         ConsumerMembershipManager membershipManager = 
createMemberInStableState();
         String topicName = "topic1";
@@ -2063,7 +2080,7 @@ public class ConsumerMembershipManagerTest {
         CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
                 Optional.empty(),
                 Optional.empty(),
-                lostError
+                Optional.ofNullable(lostError)
         );
         ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
 
diff --git 
a/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java
 
b/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java
deleted file mode 100644
index 7e1c062c0fc..00000000000
--- 
a/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.clients.consumer;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.GroupProtocol;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.test.TestUtils;
-import org.apache.kafka.common.test.api.ClusterConfigProperty;
-import org.apache.kafka.common.test.api.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterTest;
-import org.apache.kafka.common.test.api.ClusterTestExtensions;
-import org.apache.kafka.common.test.api.ClusterTests;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-
-@ExtendWith(ClusterTestExtensions.class)
-public class AsyncKafkaConsumerIntegrationTest {
-
-    @ClusterTests({
-        @ClusterTest(serverProperties = {
-            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
-            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
-            @ClusterConfigProperty(key = "group.coordinator.new.enable", value 
= "false")
-        }),
-        @ClusterTest(serverProperties = {
-            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
-            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
-            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic")
-        })
-    })
-    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance 
clusterInstance) throws Exception {
-        String topic = "test-topic";
-        clusterInstance.createTopic(topic, 1, (short) 1);
-        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(Map.of(
-            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
-            ConsumerConfig.GROUP_ID_CONFIG, "test-group",
-            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
-            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
-            ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()))) {
-            consumer.subscribe(Collections.singletonList(topic));
-            TestUtils.waitForCondition(() -> {
-                try {
-                    consumer.poll(Duration.ofMillis(1000));
-                    return false;
-                } catch (UnsupportedVersionException e) {
-                    return 
e.getMessage().equals(AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG);
-                }
-            }, "Should get UnsupportedVersionException and how to revert to 
classic protocol");
-        }
-    }
-}
diff --git 
a/core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java 
b/core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java
new file mode 100644
index 00000000000..00ba1fbfcad
--- /dev/null
+++ b/core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
+import org.apache.kafka.common.test.api.ClusterTests;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(ClusterTestExtensions.class)
+public class ConsumerIntegrationTest {
+
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+            @ClusterConfigProperty(key = "group.coordinator.new.enable", value 
= "false")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "1"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic")
+        })
+    })
+    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance 
clusterInstance) throws Exception {
+        String topic = "test-topic";
+        clusterInstance.createTopic(topic, 1, (short) 1);
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(Map.of(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
+            ConsumerConfig.GROUP_ID_CONFIG, "test-group",
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()))) {
+            consumer.subscribe(Collections.singletonList(topic));
+            TestUtils.waitForCondition(() -> {
+                try {
+                    consumer.poll(Duration.ofMillis(1000));
+                    return false;
+                } catch (UnsupportedVersionException e) {
+                    return 
e.getMessage().equals(AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG);
+                }
+            }, "Should get UnsupportedVersionException and how to revert to 
classic protocol");
+        }
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(ClusterInstance 
clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsAfterFailedListener(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(ClusterInstance 
clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsAfterFailedListener(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private static void testFetchPartitionsAfterFailedListener(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol)
+            throws InterruptedException {
+        var topic = "topic";
+        try (var producer = clusterInstance.producer(Map.of(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class))) {
+            producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
+        }
+
+        try (var consumer = clusterInstance.consumer(Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()))) {
+            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() 
{
+                private int count = 0;
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    count++;
+                    if (count == 1) throw new 
IllegalArgumentException("temporary error");
+                }
+            });
+
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofSeconds(1)).count() == 1,
+                    5000,
+                    "failed to poll data");
+        }
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(ClusterInstance
 clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsWithAlwaysFailedListener(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolConsumer(ClusterInstance
 clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsWithAlwaysFailedListener(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private static void 
testFetchPartitionsWithAlwaysFailedListener(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol)
+            throws InterruptedException {
+        var topic = "topic";
+        try (var producer = clusterInstance.producer(Map.of(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class))) {
+            producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
+        }
+
+        try (var consumer = clusterInstance.consumer(Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()))) {
+            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() 
{
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    throw new IllegalArgumentException("always failed");
+                }
+            });
+
+            long startTimeMillis = System.currentTimeMillis();
+            long currentTimeMillis = System.currentTimeMillis();
+            while (currentTimeMillis < startTimeMillis + 3000) {
+                currentTimeMillis = System.currentTimeMillis();
+                try {
+                    // In the async consumer, there is a possibility that the 
ConsumerRebalanceListenerCallbackCompletedEvent
+                    // has not yet reached the application thread. And a poll 
operation might still succeed, but it
+                    // should not return any records since none of the 
assigned topic partitions are marked as fetchable.
+                    assertEquals(0, 
consumer.poll(Duration.ofSeconds(1)).count());
+                } catch (KafkaException ex) {
+                    assertEquals("User rebalance callback throws an error", 
ex.getMessage());
+                }
+                Thread.sleep(300);
+            }
+        }
+    }
+}

Reply via email to