This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc650fbd737 MINOR: add UT for consumer.poll (#17047)
dc650fbd737 is described below
commit dc650fbd737791e39f54cd76a5f1f80245c9816a
Author: TaiJuWu <[email protected]>
AuthorDate: Sun Sep 1 10:09:18 2024 +0800
MINOR: add UT for consumer.poll (#17047)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumerTest.java | 27 +++++++++++++++++++++-
1 file changed, 26 insertions(+), 1 deletion(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b2420f29bda..1062d7882c7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3396,7 +3396,32 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
assertEquals("Timeout of 1000ms expired before the last committed
offset for partitions [test-0] could be determined. " +
"Try tuning default.api.timeout.ms larger to relax the
threshold.", timeoutException.getMessage());
}
-
+
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class)
+ public void testPreventMultiThread(GroupProtocol groupProtocol) throws
InterruptedException {
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata,
+ new RoundRobinAssignor(), true, groupInstanceId);
+ consumer.subscribe(singletonList(topic));
+
+
+ client.enableBlockingUntilWakeup(1);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ assertThrows(ConcurrentModificationException.class, () ->
consumer.poll(Duration.ZERO));
+ client.wakeup();
+ consumer.wakeup();
+ } finally {
+ service.shutdown();
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements
Deserializer<byte[]> {
@Override