This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc650fbd737 MINOR: add UT for consumer.poll (#17047)
dc650fbd737 is described below

commit dc650fbd737791e39f54cd76a5f1f80245c9816a
Author: TaiJuWu <[email protected]>
AuthorDate: Sun Sep 1 10:09:18 2024 +0800

    MINOR: add UT for consumer.poll (#17047)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 27 +++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b2420f29bda..1062d7882c7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3396,7 +3396,32 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
         assertEquals("Timeout of 1000ms expired before the last committed 
offset for partitions [test-0] could be determined. " +
             "Try tuning default.api.timeout.ms larger to relax the 
threshold.", timeoutException.getMessage());
     }
-    
+
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class)
+    public void testPreventMultiThread(GroupProtocol groupProtocol) throws 
InterruptedException {
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata,
+                new RoundRobinAssignor(), true, groupInstanceId);
+        consumer.subscribe(singletonList(topic));
+
+
+        client.enableBlockingUntilWakeup(1);
+        ExecutorService service = Executors.newSingleThreadExecutor();
+        service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
+        try {
+            TimeUnit.SECONDS.sleep(1);
+            assertThrows(ConcurrentModificationException.class, () -> 
consumer.poll(Duration.ZERO));
+            client.wakeup();
+            consumer.wakeup();
+        } finally {
+            service.shutdown();
+            assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+        }
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override

Reply via email to