This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e424755d4d KAFKA-17439: Make polling for new records an explicit 
action/event in the new consumer (#17035)
9e424755d4d is described below

commit 9e424755d4d236442847b13863580f44f27e22a6
Author: Kirk True <[email protected]>
AuthorDate: Mon Oct 28 12:46:37 2024 -0700

    KAFKA-17439: Make polling for new records an explicit action/event in the 
new consumer (#17035)
    
    Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  69 +++++++++++++
 .../consumer/internals/FetchRequestManager.java    | 108 ++++++++++++++++-----
 .../internals/events/ApplicationEvent.java         |   2 +-
 .../events/ApplicationEventProcessor.java          |   9 ++
 .../internals/events/CreateFetchRequestsEvent.java |  33 +++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   5 -
 .../consumer/internals/AsyncKafkaConsumerTest.java |   2 +
 .../internals/FetchRequestManagerTest.java         | 104 +++++++++++++++++++-
 .../events/ApplicationEventProcessorTest.java      |   2 +
 9 files changed, 303 insertions(+), 31 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index b606df2dce0..dd652e3235e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -708,6 +709,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 updateAssignmentMetadataIfNeeded(timer);
                 final Fetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
+                    // before returning the fetched records, we can send off 
the next round of fetches
+                    // and avoid block waiting for their responses to enable 
pipelining while the user
+                    // is handling the fetched records.
+                    //
+                    // NOTE: since the consumed position has already been 
updated, we must not allow
+                    // wakeups or any other errors to be triggered prior to 
returning the fetched records.
+                    sendPrefetches(timer);
+
                     if (fetch.records().isEmpty()) {
                         log.trace("Returning empty records from `poll()` "
                             + "since the consumer's position has advanced for 
at least one topic partition");
@@ -1519,6 +1528,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return fetch;
         }
 
+        // send any new fetches (won't resend pending fetches)
+        sendFetches(timer);
+
         // We do not want to be stuck blocking in poll if we are missing some 
positions
         // since the offset lookup may be backing off after a failure
 
@@ -1606,6 +1618,63 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
+    /**
+     * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests}.
+     *
+     * <p/>
+     *
+     * This method takes the following steps to maintain compatibility with 
the {@link ClassicKafkaConsumer} method
+     * of the same name:
+     *
+     * <ul>
+     *     <li>
+     *         The method will wait for confirmation of the request creation 
before continuing.
+     *     </li>
+     *     <li>
+     *         The method will throw exceptions encountered during request 
creation to the user <b>immediately</b>.
+     *     </li>
+     *     <li>
+     *         The method will suppress {@link TimeoutException}s that occur 
while waiting for the confirmation.
+     *         Timeouts during request creation are a byproduct of this 
consumer's thread communication mechanisms.
+     *         That exception type isn't thrown in the request creation step 
of the {@link ClassicKafkaConsumer}.
+     *         Additionally, timeouts will not impact the logic of {@link 
#pollForFetches(Timer) blocking requests}
+     *         as it can handle requests that are created after the timeout.
+     *     </li>
+     * </ul>
+     *
+     * @param timer Timer used to bound how long the consumer waits for the 
requests to be created, which in practice
+     *              is used to avoid using {@link Long#MAX_VALUE} to wait 
"forever"
+     */
+    private void sendFetches(Timer timer) {
+        try {
+            applicationEventHandler.addAndGet(new 
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+        } catch (TimeoutException e) {
+            // Can be ignored, per above comments.
+        }
+    }
+
+    /**
+     * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests} for the
+     * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the 
pre-fetch case, the application thread
+     * will not wait for confirmation of the request creation before 
continuing.
+     *
+     * <p/>
+     *
+     * At the point this method is called, {@link 
KafkaConsumer#poll(Duration)} has data ready to return to the user,
+     * which means the consumed position was already updated. In order to 
prevent potential gaps in records, this
+     * method is designed to suppress all exceptions.
+     *
+     * @param timer Provides an upper bound for the event and its {@link 
CompletableFuture future}
+     */
+    private void sendPrefetches(Timer timer) {
+        try {
+            applicationEventHandler.add(new 
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+        } catch (Throwable t) {
+            // Any unexpected errors will be logged for troubleshooting, but 
not thrown.
+            log.warn("An unexpected error occurred while pre-fetching data in 
Consumer.poll(), but was suppressed", t);
+        }
+    }
+
     @Override
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         maybeThrowFencedInstanceException();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index 3cef94e05f8..c52b5453e21 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -19,8 +19,10 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.Consumer;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.LogContext;
@@ -29,6 +31,7 @@ import org.apache.kafka.common.utils.Time;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -41,6 +44,7 @@ import java.util.stream.Collectors;
 public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
 
     private final NetworkClientDelegate networkClientDelegate;
+    private CompletableFuture<Void> pendingFetchRequestFuture;
 
     FetchRequestManager(final LogContext logContext,
                         final Time time,
@@ -65,15 +69,42 @@ public class FetchRequestManager extends AbstractFetch 
implements RequestManager
         networkClientDelegate.maybeThrowAuthFailure(node);
     }
 
+    /**
+     * Signals the {@link Consumer} wants requests be created for the broker 
nodes to fetch the next
+     * batch of records.
+     *
+     * @see CreateFetchRequestsEvent
+     * @return Future on which the caller can wait to ensure that the requests 
have been created
+     */
+    public CompletableFuture<Void> createFetchRequests() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        if (pendingFetchRequestFuture != null) {
+            // In this case, we have an outstanding fetch request, so chain 
the newly created future to be
+            // completed when the "pending" future is completed.
+            pendingFetchRequestFuture.whenComplete((value, exception) -> {
+                if (exception != null) {
+                    future.completeExceptionally(exception);
+                } else {
+                    future.complete(value);
+                }
+            });
+        } else {
+            pendingFetchRequestFuture = future;
+        }
+
+        return future;
+    }
+
     /**
      * {@inheritDoc}
      */
     @Override
     public PollResult poll(long currentTimeMs) {
         return pollInternal(
-                prepareFetchRequests(),
-                this::handleFetchSuccess,
-                this::handleFetchFailure
+            this::prepareFetchRequests,
+            this::handleFetchSuccess,
+            this::handleFetchFailure
         );
     }
 
@@ -82,9 +113,12 @@ public class FetchRequestManager extends AbstractFetch 
implements RequestManager
      */
     @Override
     public PollResult pollOnClose(long currentTimeMs) {
+        // There needs to be a pending fetch request for pollInternal to 
create the requests.
+        createFetchRequests();
+
         // TODO: move the logic to poll to handle signal close
         return pollInternal(
-                prepareCloseFetchSessionRequests(),
+                this::prepareCloseFetchSessionRequests,
                 this::handleCloseFetchSessionSuccess,
                 this::handleCloseFetchSessionFailure
         );
@@ -94,28 +128,56 @@ public class FetchRequestManager extends AbstractFetch 
implements RequestManager
      * Creates the {@link PollResult poll result} that contains a list of zero 
or more
      * {@link FetchRequest.Builder fetch requests}.
      *
-     * @param fetchRequests  {@link Map} of {@link Node nodes} to their {@link 
FetchSessionHandler.FetchRequestData}
-     * @param successHandler {@link ResponseHandler Handler for successful 
responses}
-     * @param errorHandler   {@link ResponseHandler Handler for failure 
responses}
+     * @param fetchRequestPreparer {@link FetchRequestPreparer} to generate a 
{@link Map} of {@link Node nodes}
+     *                             to their {@link 
FetchSessionHandler.FetchRequestData}
+     * @param successHandler       {@link ResponseHandler Handler for 
successful responses}
+     * @param errorHandler         {@link ResponseHandler Handler for failure 
responses}
      * @return {@link PollResult}
      */
-    private PollResult pollInternal(Map<Node, 
FetchSessionHandler.FetchRequestData> fetchRequests,
+    private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
                                     ResponseHandler<ClientResponse> 
successHandler,
                                     ResponseHandler<Throwable> errorHandler) {
-        List<UnsentRequest> requests = 
fetchRequests.entrySet().stream().map(entry -> {
-            final Node fetchTarget = entry.getKey();
-            final FetchSessionHandler.FetchRequestData data = entry.getValue();
-            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
-            final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
-                if (error != null)
-                    errorHandler.handle(fetchTarget, data, error);
-                else
-                    successHandler.handle(fetchTarget, data, clientResponse);
-            };
-
-            return new UnsentRequest(request, 
Optional.of(fetchTarget)).whenComplete(responseHandler);
-        }).collect(Collectors.toList());
-
-        return new PollResult(requests);
+        if (pendingFetchRequestFuture == null) {
+            // If no explicit request for creating fetch requests was issued, 
just short-circuit.
+            return PollResult.EMPTY;
+        }
+
+        try {
+            Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = 
fetchRequestPreparer.prepare();
+
+            List<UnsentRequest> requests = 
fetchRequests.entrySet().stream().map(entry -> {
+                final Node fetchTarget = entry.getKey();
+                final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+                final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+                final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
+                    if (error != null)
+                        errorHandler.handle(fetchTarget, data, error);
+                    else
+                        successHandler.handle(fetchTarget, data, 
clientResponse);
+                };
+
+                return new UnsentRequest(request, 
Optional.of(fetchTarget)).whenComplete(responseHandler);
+            }).collect(Collectors.toList());
+
+            pendingFetchRequestFuture.complete(null);
+            return new PollResult(requests);
+        } catch (Throwable t) {
+            // A "dummy" poll result is returned here rather than rethrowing 
the error because any error
+            // that is thrown from any RequestManager.poll() method interrupts 
the polling of the other
+            // request managers.
+            pendingFetchRequestFuture.completeExceptionally(t);
+            return PollResult.EMPTY;
+        } finally {
+            pendingFetchRequestFuture = null;
+        }
+    }
+
+    /**
+     * Simple functional interface to all passing in a method reference for 
improved readability.
+     */
+    @FunctionalInterface
+    protected interface FetchRequestPreparer {
+
+        Map<Node, FetchSessionHandler.FetchRequestData> prepare();
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e2ee28f5702..0d258cda2b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -32,7 +32,7 @@ public abstract class ApplicationEvent {
         COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
         LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, 
TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
-        COMMIT_ON_CLOSE,
+        COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
         SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 6f6a1714bed..2f6ca35feaf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -124,6 +124,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((CommitOnCloseEvent) event);
                 return;
 
+            case CREATE_FETCH_REQUESTS:
+                process((CreateFetchRequestsEvent) event);
+                return;
+
             case SHARE_FETCH:
                 process((ShareFetchEvent) event);
                 return;
@@ -176,6 +180,11 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         }
     }
 
+    private void process(final CreateFetchRequestsEvent event) {
+        CompletableFuture<Void> future = 
requestManagers.fetchRequestManager.createFetchRequests();
+        future.whenComplete(complete(event.future()));
+    }
+
     private void process(final AsyncCommitEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
             return;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
new file mode 100644
index 00000000000..056cd4811ab
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
+
+/**
+ * {@code CreateFetchRequestsEvent} signals that the {@link Consumer} wants to 
issue fetch requests to the nodes
+ * for the partitions to which the consumer is currently subscribed. The event 
is completed when the
+ * {@link FetchRequestManager} has finished <em>creating</em> (i.e. not 
enqueuing, sending, or receiving)
+ * fetch requests (if any) to send to the broker nodes.
+ */
+public class CreateFetchRequestsEvent extends 
CompletableApplicationEvent<Void> {
+
+    public CreateFetchRequestsEvent(final long deadlineMs) {
+        super(Type.CREATE_FETCH_REQUESTS, deadlineMs);
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9139fa16ab0..c260fa48c01 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2566,11 +2566,6 @@ public class KafkaConsumerTest {
         consumer.assign(singleton(tp0));
         consumer.seek(tp0, 50L);
 
-        // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in 
background thread.
-        // Wait for the first fetch request to avoid ListOffsetResponse 
mismatch.
-        TestUtils.waitForCondition(() -> groupProtocol == 
GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
-                "No fetch request sent");
-
         client.prepareResponse(request -> request instanceof 
ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
         assertEquals(singletonMap(tp0, 90L), 
consumer.endOffsets(Collections.singleton(tp0)));
         // correct lag result should be returned as well
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f03cde308f6..7f78f6e8bba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -40,6 +40,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplication
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -1711,6 +1712,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(singletonList("topic1"));
         consumer.poll(Duration.ofMillis(100));
         verify(applicationEventHandler).add(any(PollEvent.class));
+        
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
     }
 
     private Properties requiredConsumerConfigAndGroupId(final String groupId) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index c79077f11a3..3e3f70a7443 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -120,6 +121,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -134,8 +136,10 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -237,8 +241,12 @@ public class FetchRequestManagerTest {
     }
 
     private int sendFetches() {
+        return sendFetches(true);
+    }
+
+    private int sendFetches(boolean requestFetch) {
         offsetFetcher.validatePositionsOnMetadataChange();
-        return fetcher.sendFetches();
+        return fetcher.sendFetches(requestFetch);
     }
 
     @Test
@@ -3386,6 +3394,71 @@ public class FetchRequestManagerTest {
         assertTrue(subscriptions.isFetchable(tp1));
     }
 
+    @Test
+    public void testPollWithoutCreateFetchRequests() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(0, sendFetches(false));
+    }
+
+    @Test
+    public void testPollWithCreateFetchRequests() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        CompletableFuture<Void> future = fetcher.createFetchRequests();
+        assertNotNull(future);
+        assertFalse(future.isDone());
+
+        assertEquals(1, sendFetches(false));
+        assertTrue(future.isDone());
+
+        assertEquals(0, sendFetches(false));
+    }
+
+    @Test
+    public void testPollWithCreateFetchRequestsError() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        fetcher.setAuthenticationException(new 
AuthenticationException("Intentional error"));
+        CompletableFuture<Void> future = fetcher.createFetchRequests();
+        assertNotNull(future);
+        assertFalse(future.isDone());
+
+        assertDoesNotThrow(() -> sendFetches(false));
+        assertFutureThrows(future, AuthenticationException.class);
+    }
+
+    @Test
+    public void testPollWithRedundantCreateFetchRequests() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            CompletableFuture<Void> future = fetcher.createFetchRequests();
+            assertNotNull(future);
+            futures.add(future);
+        }
+
+        assertEquals(0, 
futures.stream().filter(CompletableFuture::isDone).count());
+
+        assertEquals(1, sendFetches(false));
+        assertEquals(futures.size(), 
futures.stream().filter(CompletableFuture::isDone).count());
+
+    }
+
     private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
             TopicPartition topicPartition,
             Errors error,
@@ -3639,6 +3712,7 @@ public class FetchRequestManagerTest {
     private class TestableFetchRequestManager<K, V> extends 
FetchRequestManager {
 
         private final FetchCollector<K, V> fetchCollector;
+        private AuthenticationException authenticationException;
 
         public TestableFetchRequestManager(LogContext logContext,
                                            Time time,
@@ -3654,11 +3728,37 @@ public class FetchRequestManagerTest {
             this.fetchCollector = fetchCollector;
         }
 
+        public void setAuthenticationException(AuthenticationException 
authenticationException) {
+            this.authenticationException = authenticationException;
+        }
+
+        @Override
+        protected boolean isUnavailable(Node node) {
+            if (authenticationException != null)
+                return true;
+
+            return super.isUnavailable(node);
+        }
+
+        @Override
+        protected void maybeThrowAuthFailure(Node node) {
+            if (authenticationException != null) {
+                AuthenticationException e = authenticationException;
+                authenticationException = null;
+                throw e;
+            }
+
+            super.maybeThrowAuthFailure(node);
+        }
+
         private Fetch<K, V> collectFetch() {
             return fetchCollector.collectFetch(fetchBuffer);
         }
 
-        private int sendFetches() {
+        private int sendFetches(boolean requestFetch) {
+            if (requestFetch)
+                createFetchRequests();
+
             NetworkClientDelegate.PollResult pollResult = 
poll(time.milliseconds());
             networkClientDelegate.addAll(pollResult.unsentRequests);
             return pollResult.unsentRequests.size();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index e60a0a70059..27f3ae46002 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -128,6 +128,8 @@ public class ApplicationEventProcessorTest {
 
     private static Stream<Arguments> applicationEvents() {
         return Stream.of(
+                Arguments.of(new PollEvent(100)),
+                Arguments.of(new 
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
                 Arguments.of(new AsyncCommitEvent(new HashMap<>())),
                 Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
                 Arguments.of(new CheckAndUpdatePositionsEvent(500)),

Reply via email to