This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a1465e14c1 KAFKA-16792: Enable consumer unit tests that fail to fetch 
offsets only for new consumer with poll(0) (#16982)
3a1465e14c1 is described below

commit 3a1465e14c19560cbfaf6829209eb42cd5a45f70
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 00:20:55 2024 +0800

    KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for 
new consumer with poll(0) (#16982)
    
    Reviewers: Lianet Magrans <[email protected]>, TaiJuWu 
<[email protected]>, Kirk True <[email protected]>, TengYao Chi 
<[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 86 +++++++++++++++-------
 1 file changed, 58 insertions(+), 28 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 77c0264849a..8d3e955e799 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1068,12 +1068,20 @@ public class KafkaConsumerTest {
         assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
-    public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
-        assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
+    @EnumSource(GroupProtocol.class)
+    public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) 
throws InterruptedException {
+        setupThrowableConsumer(groupProtocol);
+        TestUtils.waitForCondition(() -> {
+            try {
+                // For CONSUMER protocol, the offset request is sent in the 
background thread,
+                // so we need to wait until the offset request is sent.
+                consumer.poll(Duration.ZERO);
+                return false;
+            } catch (UnsupportedVersionException e) {
+                return true;
+            }
+        }, "Failed to throw UnsupportedVersionException in poll");
     }
 
     @ParameterizedTest
@@ -2466,12 +2474,10 @@ public class KafkaConsumerTest {
         assertThrows(IllegalStateException.class, consumer::groupMetadata);
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     @SuppressWarnings("unchecked")
-    public void testCurrentLag(GroupProtocol groupProtocol) {
+    public void testCurrentLag(GroupProtocol groupProtocol) throws 
InterruptedException {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
@@ -2486,33 +2492,49 @@ public class KafkaConsumerTest {
 
         // poll once to update with the current metadata
         consumer.poll(Duration.ofMillis(0));
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+                "No metadata requests sent");
         client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
         // no error for no current position
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-        assertEquals(0, client.inFlightRequestCount());
-
+        if (groupProtocol == GroupProtocol.CLASSIC) {
+            // Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+            // different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+            // on the next background thread poll.
+            assertEquals(0, client.inFlightRequestCount());
+        }
         // poll once again, which should send the list-offset request
         consumer.seek(tp0, 50L);
         consumer.poll(Duration.ofMillis(0));
         // requests: list-offset, fetch
-        assertEquals(2, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> {
+            boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+            boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+            return hasListOffsetRequest && hasFetchRequest;
+        }, "No list-offset & fetch request sent");
 
         // no error for no end offset (so unknown lag)
         assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
         // poll once again, which should return the list-offset response
         // and hence next call would return correct lag result
-        client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+        ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+        client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
         consumer.poll(Duration.ofMillis(0));
 
-        assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+        // For AsyncKafkaConsumer, subscription state is updated in 
background, so the result will eventually be updated.
+        TestUtils.waitForCondition(() -> {
+            OptionalLong result = consumer.currentLag(tp0);
+            return result.isPresent() && result.getAsLong() == 40L;
+        }, "Subscription state is not updated");
         // requests: fetch
-        assertEquals(1, client.inFlightRequestCount());
+        TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
         // one successful fetch should update the log end offset and the 
position
+        ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
         final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-        client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+        client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
         final ConsumerRecords<String, String> records = 
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
         assertEquals(5, records.count());
@@ -2522,32 +2544,40 @@ public class KafkaConsumerTest {
         assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
-    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol 
groupProtocol) {
+    @EnumSource(GroupProtocol.class)
+    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol 
groupProtocol) throws InterruptedException {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, singletonMap(topic, 1));
 
-        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
-
+        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, false,
+                null, groupInstanceId, false);
         consumer.assign(singleton(tp0));
-
-        // poll once to update with the current metadata
-        consumer.poll(Duration.ofMillis(0));
-        client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
-
         consumer.seek(tp0, 50L);
-        client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L)));
 
+        // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in 
background thread.
+        // Wait for the first fetch request to avoid ListOffsetResponse 
mismatch.
+        TestUtils.waitForCondition(() -> groupProtocol == 
GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
+                "No fetch request sent");
+
+        client.prepareResponse(request -> request instanceof 
ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
         assertEquals(singletonMap(tp0, 90L), 
consumer.endOffsets(Collections.singleton(tp0)));
         // correct lag result should be returned as well
         assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
     }
 
+    private ClientRequest findRequest(MockClient client, ApiKeys apiKey) {
+        Optional<ClientRequest> request = client.requests().stream().filter(r 
-> r.requestBuilder().apiKey().equals(apiKey)).findFirst();
+        assertTrue(request.isPresent(), "No " + apiKey + " request was 
submitted to the client");
+        return request.get();
+    }
+
+    private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
+        return client.requests().stream().anyMatch(request -> 
request.requestBuilder().apiKey().equals(apiKey));
+    }
+
     private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError(GroupProtocol groupProtocol,
                                                                                
  final Time time) {
         ConsumerMetadata metadata = createMetadata(subscription);

Reply via email to