This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a1465e14c1 KAFKA-16792: Enable consumer unit tests that fail to fetch
offsets only for new consumer with poll(0) (#16982)
3a1465e14c1 is described below
commit 3a1465e14c19560cbfaf6829209eb42cd5a45f70
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 00:20:55 2024 +0800
KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for
new consumer with poll(0) (#16982)
Reviewers: Lianet Magrans <[email protected]>, TaiJuWu
<[email protected]>, Kirk True <[email protected]>, TengYao Chi
<[email protected]>
---
.../kafka/clients/consumer/KafkaConsumerTest.java | 86 +++++++++++++++-------
1 file changed, 58 insertions(+), 28 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 77c0264849a..8d3e955e799 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1068,12 +1068,20 @@ public class KafkaConsumerTest {
assertThrows(UnsupportedVersionException.class, () ->
setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
- public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
- assertThrows(UnsupportedVersionException.class, () ->
setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
+ @EnumSource(GroupProtocol.class)
+ public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol)
throws InterruptedException {
+ setupThrowableConsumer(groupProtocol);
+ TestUtils.waitForCondition(() -> {
+ try {
+ // For CONSUMER protocol, the offset request is sent in the
background thread,
+ // so we need to wait until the offset request is sent.
+ consumer.poll(Duration.ZERO);
+ return false;
+ } catch (UnsupportedVersionException e) {
+ return true;
+ }
+ }, "Failed to throw UnsupportedVersionException in poll");
}
@ParameterizedTest
@@ -2466,12 +2474,10 @@ public class KafkaConsumerTest {
assertThrows(IllegalStateException.class, consumer::groupMetadata);
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
- public void testCurrentLag(GroupProtocol groupProtocol) {
+ public void testCurrentLag(GroupProtocol groupProtocol) throws
InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
@@ -2486,33 +2492,49 @@ public class KafkaConsumerTest {
// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
+ TestUtils.waitForCondition(() -> requestGenerated(client,
ApiKeys.FIND_COORDINATOR),
+ "No metadata requests sent");
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
// no error for no current position
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
- assertEquals(0, client.inFlightRequestCount());
-
+ if (groupProtocol == GroupProtocol.CLASSIC) {
+ // Classic consumer does not send the LIST_OFFSETS right away
(requires an explicit poll),
+ // different from the new async consumer, that will send the
LIST_OFFSETS request in the background thread
+ // on the next background thread poll.
+ assertEquals(0, client.inFlightRequestCount());
+ }
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch
- assertEquals(2, client.inFlightRequestCount());
+ TestUtils.waitForCondition(() -> {
+ boolean hasListOffsetRequest = requestGenerated(client,
ApiKeys.LIST_OFFSETS);
+ boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+ return hasListOffsetRequest && hasFetchRequest;
+ }, "No list-offset & fetch request sent");
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
// poll once again, which should return the list-offset response
// and hence next call would return correct lag result
- client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ ClientRequest listOffsetRequest = findRequest(client,
ApiKeys.LIST_OFFSETS);
+ client.respondToRequest(listOffsetRequest,
listOffsetsResponse(singletonMap(tp0, 90L)));
consumer.poll(Duration.ofMillis(0));
- assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+ // For AsyncKafkaConsumer, subscription state is updated in
background, so the result will eventually be updated.
+ TestUtils.waitForCondition(() -> {
+ OptionalLong result = consumer.currentLag(tp0);
+ return result.isPresent() && result.getAsLong() == 40L;
+ }, "Subscription state is not updated");
// requests: fetch
- assertEquals(1, client.inFlightRequestCount());
+ TestUtils.waitForCondition(() -> requestGenerated(client,
ApiKeys.FETCH), "No fetch request sent");
// one successful fetch should update the log end offset and the
position
+ ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
- client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+ client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0,
fetchInfo)));
final ConsumerRecords<String, String> records =
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
@@ -2522,32 +2544,40 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
- public void testListOffsetShouldUpdateSubscriptions(GroupProtocol
groupProtocol) {
+ @EnumSource(GroupProtocol.class)
+ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol
groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
- consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
-
+ consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, false,
+ null, groupInstanceId, false);
consumer.assign(singleton(tp0));
-
- // poll once to update with the current metadata
- consumer.poll(Duration.ofMillis(0));
- client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
-
consumer.seek(tp0, 50L);
- client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L)));
+ // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in
background thread.
+ // Wait for the first fetch request to avoid ListOffsetResponse
mismatch.
+ TestUtils.waitForCondition(() -> groupProtocol ==
GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
+ "No fetch request sent");
+
+ client.prepareResponse(request -> request instanceof
ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
assertEquals(singletonMap(tp0, 90L),
consumer.endOffsets(Collections.singleton(tp0)));
// correct lag result should be returned as well
assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
}
+ private ClientRequest findRequest(MockClient client, ApiKeys apiKey) {
+ Optional<ClientRequest> request = client.requests().stream().filter(r
-> r.requestBuilder().apiKey().equals(apiKey)).findFirst();
+ assertTrue(request.isPresent(), "No " + apiKey + " request was
submitted to the client");
+ return request.get();
+ }
+
+ private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
+ return client.requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().equals(apiKey));
+ }
+
private KafkaConsumer<String, String>
consumerWithPendingAuthenticationError(GroupProtocol groupProtocol,
final Time time) {
ConsumerMetadata metadata = createMetadata(subscription);