This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e424755d4d KAFKA-17439: Make polling for new records an explicit
action/event in the new consumer (#17035)
9e424755d4d is described below
commit 9e424755d4d236442847b13863580f44f27e22a6
Author: Kirk True <[email protected]>
AuthorDate: Mon Oct 28 12:46:37 2024 -0700
KAFKA-17439: Make polling for new records an explicit action/event in the
new consumer (#17035)
Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 69 +++++++++++++
.../consumer/internals/FetchRequestManager.java | 108 ++++++++++++++++-----
.../internals/events/ApplicationEvent.java | 2 +-
.../events/ApplicationEventProcessor.java | 9 ++
.../internals/events/CreateFetchRequestsEvent.java | 33 +++++++
.../kafka/clients/consumer/KafkaConsumerTest.java | 5 -
.../consumer/internals/AsyncKafkaConsumerTest.java | 2 +
.../internals/FetchRequestManagerTest.java | 104 +++++++++++++++++++-
.../events/ApplicationEventProcessorTest.java | 2 +
9 files changed, 303 insertions(+), 31 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index b606df2dce0..dd652e3235e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -708,6 +709,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
updateAssignmentMetadataIfNeeded(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
+ // before returning the fetched records, we can send off
the next round of fetches
+ // and avoid block waiting for their responses to enable
pipelining while the user
+ // is handling the fetched records.
+ //
+ // NOTE: since the consumed position has already been
updated, we must not allow
+ // wakeups or any other errors to be triggered prior to
returning the fetched records.
+ sendPrefetches(timer);
+
if (fetch.records().isEmpty()) {
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for
at least one topic partition");
@@ -1519,6 +1528,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return fetch;
}
+ // send any new fetches (won't resend pending fetches)
+ sendFetches(timer);
+
// We do not want to be stuck blocking in poll if we are missing some
positions
// since the offset lookup may be backing off after a failure
@@ -1606,6 +1618,63 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
offsetAndMetadata.leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
+ /**
+ * This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests}.
+ *
+ * <p/>
+ *
+ * This method takes the following steps to maintain compatibility with
the {@link ClassicKafkaConsumer} method
+ * of the same name:
+ *
+ * <ul>
+ * <li>
+ * The method will wait for confirmation of the request creation
before continuing.
+ * </li>
+ * <li>
+ * The method will throw exceptions encountered during request
creation to the user <b>immediately</b>.
+ * </li>
+ * <li>
+ * The method will suppress {@link TimeoutException}s that occur
while waiting for the confirmation.
+ * Timeouts during request creation are a byproduct of this
consumer's thread communication mechanisms.
+ * That exception type isn't thrown in the request creation step
of the {@link ClassicKafkaConsumer}.
+ * Additionally, timeouts will not impact the logic of {@link
#pollForFetches(Timer) blocking requests}
+ * as it can handle requests that are created after the timeout.
+ * </li>
+ * </ul>
+ *
+ * @param timer Timer used to bound how long the consumer waits for the
requests to be created, which in practice
+ * is used to avoid using {@link Long#MAX_VALUE} to wait
"forever"
+ */
+ private void sendFetches(Timer timer) {
+ try {
+ applicationEventHandler.addAndGet(new
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+ } catch (TimeoutException e) {
+ // Can be ignored, per above comments.
+ }
+ }
+
+ /**
+ * This method signals the background thread to {@link
CreateFetchRequestsEvent create fetch requests} for the
+ * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the
pre-fetch case, the application thread
+ * will not wait for confirmation of the request creation before
continuing.
+ *
+ * <p/>
+ *
+ * At the point this method is called, {@link
KafkaConsumer#poll(Duration)} has data ready to return to the user,
+ * which means the consumed position was already updated. In order to
prevent potential gaps in records, this
+ * method is designed to suppress all exceptions.
+ *
+ * @param timer Provides an upper bound for the event and its {@link
CompletableFuture future}
+ */
+ private void sendPrefetches(Timer timer) {
+ try {
+ applicationEventHandler.add(new
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+ } catch (Throwable t) {
+ // Any unexpected errors will be logged for troubleshooting, but
not thrown.
+ log.warn("An unexpected error occurred while pre-fetching data in
Consumer.poll(), but was suppressed", t);
+ }
+ }
+
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index 3cef94e05f8..c52b5453e21 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -19,8 +19,10 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.Consumer;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
@@ -29,6 +31,7 @@ import org.apache.kafka.common.utils.Time;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -41,6 +44,7 @@ import java.util.stream.Collectors;
public class FetchRequestManager extends AbstractFetch implements
RequestManager {
private final NetworkClientDelegate networkClientDelegate;
+ private CompletableFuture<Void> pendingFetchRequestFuture;
FetchRequestManager(final LogContext logContext,
final Time time,
@@ -65,15 +69,42 @@ public class FetchRequestManager extends AbstractFetch
implements RequestManager
networkClientDelegate.maybeThrowAuthFailure(node);
}
+ /**
+ * Signals the {@link Consumer} wants requests be created for the broker
nodes to fetch the next
+ * batch of records.
+ *
+ * @see CreateFetchRequestsEvent
+ * @return Future on which the caller can wait to ensure that the requests
have been created
+ */
+ public CompletableFuture<Void> createFetchRequests() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ if (pendingFetchRequestFuture != null) {
+ // In this case, we have an outstanding fetch request, so chain
the newly created future to be
+ // completed when the "pending" future is completed.
+ pendingFetchRequestFuture.whenComplete((value, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(value);
+ }
+ });
+ } else {
+ pendingFetchRequestFuture = future;
+ }
+
+ return future;
+ }
+
/**
* {@inheritDoc}
*/
@Override
public PollResult poll(long currentTimeMs) {
return pollInternal(
- prepareFetchRequests(),
- this::handleFetchSuccess,
- this::handleFetchFailure
+ this::prepareFetchRequests,
+ this::handleFetchSuccess,
+ this::handleFetchFailure
);
}
@@ -82,9 +113,12 @@ public class FetchRequestManager extends AbstractFetch
implements RequestManager
*/
@Override
public PollResult pollOnClose(long currentTimeMs) {
+ // There needs to be a pending fetch request for pollInternal to
create the requests.
+ createFetchRequests();
+
// TODO: move the logic to poll to handle signal close
return pollInternal(
- prepareCloseFetchSessionRequests(),
+ this::prepareCloseFetchSessionRequests,
this::handleCloseFetchSessionSuccess,
this::handleCloseFetchSessionFailure
);
@@ -94,28 +128,56 @@ public class FetchRequestManager extends AbstractFetch
implements RequestManager
* Creates the {@link PollResult poll result} that contains a list of zero
or more
* {@link FetchRequest.Builder fetch requests}.
*
- * @param fetchRequests {@link Map} of {@link Node nodes} to their {@link
FetchSessionHandler.FetchRequestData}
- * @param successHandler {@link ResponseHandler Handler for successful
responses}
- * @param errorHandler {@link ResponseHandler Handler for failure
responses}
+ * @param fetchRequestPreparer {@link FetchRequestPreparer} to generate a
{@link Map} of {@link Node nodes}
+ * to their {@link
FetchSessionHandler.FetchRequestData}
+ * @param successHandler {@link ResponseHandler Handler for
successful responses}
+ * @param errorHandler {@link ResponseHandler Handler for failure
responses}
* @return {@link PollResult}
*/
- private PollResult pollInternal(Map<Node,
FetchSessionHandler.FetchRequestData> fetchRequests,
+ private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
ResponseHandler<ClientResponse>
successHandler,
ResponseHandler<Throwable> errorHandler) {
- List<UnsentRequest> requests =
fetchRequests.entrySet().stream().map(entry -> {
- final Node fetchTarget = entry.getKey();
- final FetchSessionHandler.FetchRequestData data = entry.getValue();
- final FetchRequest.Builder request =
createFetchRequest(fetchTarget, data);
- final BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
- if (error != null)
- errorHandler.handle(fetchTarget, data, error);
- else
- successHandler.handle(fetchTarget, data, clientResponse);
- };
-
- return new UnsentRequest(request,
Optional.of(fetchTarget)).whenComplete(responseHandler);
- }).collect(Collectors.toList());
-
- return new PollResult(requests);
+ if (pendingFetchRequestFuture == null) {
+ // If no explicit request for creating fetch requests was issued,
just short-circuit.
+ return PollResult.EMPTY;
+ }
+
+ try {
+ Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests =
fetchRequestPreparer.prepare();
+
+ List<UnsentRequest> requests =
fetchRequests.entrySet().stream().map(entry -> {
+ final Node fetchTarget = entry.getKey();
+ final FetchSessionHandler.FetchRequestData data =
entry.getValue();
+ final FetchRequest.Builder request =
createFetchRequest(fetchTarget, data);
+ final BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
+ if (error != null)
+ errorHandler.handle(fetchTarget, data, error);
+ else
+ successHandler.handle(fetchTarget, data,
clientResponse);
+ };
+
+ return new UnsentRequest(request,
Optional.of(fetchTarget)).whenComplete(responseHandler);
+ }).collect(Collectors.toList());
+
+ pendingFetchRequestFuture.complete(null);
+ return new PollResult(requests);
+ } catch (Throwable t) {
+ // A "dummy" poll result is returned here rather than rethrowing
the error because any error
+ // that is thrown from any RequestManager.poll() method interrupts
the polling of the other
+ // request managers.
+ pendingFetchRequestFuture.completeExceptionally(t);
+ return PollResult.EMPTY;
+ } finally {
+ pendingFetchRequestFuture = null;
+ }
+ }
+
+ /**
+ * Simple functional interface to all passing in a method reference for
improved readability.
+ */
+ @FunctionalInterface
+ protected interface FetchRequestPreparer {
+
+ Map<Node, FetchSessionHandler.FetchRequestData> prepare();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e2ee28f5702..0d258cda2b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -32,7 +32,7 @@ public abstract class ApplicationEvent {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS,
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET,
TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
- COMMIT_ON_CLOSE,
+ COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 6f6a1714bed..2f6ca35feaf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -124,6 +124,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((CommitOnCloseEvent) event);
return;
+ case CREATE_FETCH_REQUESTS:
+ process((CreateFetchRequestsEvent) event);
+ return;
+
case SHARE_FETCH:
process((ShareFetchEvent) event);
return;
@@ -176,6 +180,11 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
}
+ private void process(final CreateFetchRequestsEvent event) {
+ CompletableFuture<Void> future =
requestManagers.fetchRequestManager.createFetchRequests();
+ future.whenComplete(complete(event.future()));
+ }
+
private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
new file mode 100644
index 00000000000..056cd4811ab
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
+
+/**
+ * {@code CreateFetchRequestsEvent} signals that the {@link Consumer} wants to
issue fetch requests to the nodes
+ * for the partitions to which the consumer is currently subscribed. The event
is completed when the
+ * {@link FetchRequestManager} has finished <em>creating</em> (i.e. not
enqueuing, sending, or receiving)
+ * fetch requests (if any) to send to the broker nodes.
+ */
+public class CreateFetchRequestsEvent extends
CompletableApplicationEvent<Void> {
+
+ public CreateFetchRequestsEvent(final long deadlineMs) {
+ super(Type.CREATE_FETCH_REQUESTS, deadlineMs);
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9139fa16ab0..c260fa48c01 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2566,11 +2566,6 @@ public class KafkaConsumerTest {
consumer.assign(singleton(tp0));
consumer.seek(tp0, 50L);
- // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in
background thread.
- // Wait for the first fetch request to avoid ListOffsetResponse
mismatch.
- TestUtils.waitForCondition(() -> groupProtocol ==
GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
- "No fetch request sent");
-
client.prepareResponse(request -> request instanceof
ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
assertEquals(singletonMap(tp0, 90L),
consumer.endOffsets(Collections.singleton(tp0)));
// correct lag result should be returned as well
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f03cde308f6..7f78f6e8bba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -40,6 +40,7 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableApplication
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -1711,6 +1712,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singletonList("topic1"));
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
+
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
}
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index c79077f11a3..3e3f70a7443 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
@@ -120,6 +121,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -134,8 +136,10 @@ import static java.util.Collections.singletonMap;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -237,8 +241,12 @@ public class FetchRequestManagerTest {
}
private int sendFetches() {
+ return sendFetches(true);
+ }
+
+ private int sendFetches(boolean requestFetch) {
offsetFetcher.validatePositionsOnMetadataChange();
- return fetcher.sendFetches();
+ return fetcher.sendFetches(requestFetch);
}
@Test
@@ -3386,6 +3394,71 @@ public class FetchRequestManagerTest {
assertTrue(subscriptions.isFetchable(tp1));
}
+ @Test
+ public void testPollWithoutCreateFetchRequests() {
+ buildFetcher();
+
+ assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ assertEquals(0, sendFetches(false));
+ }
+
+ @Test
+ public void testPollWithCreateFetchRequests() {
+ buildFetcher();
+
+ assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ CompletableFuture<Void> future = fetcher.createFetchRequests();
+ assertNotNull(future);
+ assertFalse(future.isDone());
+
+ assertEquals(1, sendFetches(false));
+ assertTrue(future.isDone());
+
+ assertEquals(0, sendFetches(false));
+ }
+
+ @Test
+ public void testPollWithCreateFetchRequestsError() {
+ buildFetcher();
+
+ assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ fetcher.setAuthenticationException(new
AuthenticationException("Intentional error"));
+ CompletableFuture<Void> future = fetcher.createFetchRequests();
+ assertNotNull(future);
+ assertFalse(future.isDone());
+
+ assertDoesNotThrow(() -> sendFetches(false));
+ assertFutureThrows(future, AuthenticationException.class);
+ }
+
+ @Test
+ public void testPollWithRedundantCreateFetchRequests() {
+ buildFetcher();
+
+ assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<Void> future = fetcher.createFetchRequests();
+ assertNotNull(future);
+ futures.add(future);
+ }
+
+ assertEquals(0,
futures.stream().filter(CompletableFuture::isDone).count());
+
+ assertEquals(1, sendFetches(false));
+ assertEquals(futures.size(),
futures.stream().filter(CompletableFuture::isDone).count());
+
+ }
+
private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
TopicPartition topicPartition,
Errors error,
@@ -3639,6 +3712,7 @@ public class FetchRequestManagerTest {
private class TestableFetchRequestManager<K, V> extends
FetchRequestManager {
private final FetchCollector<K, V> fetchCollector;
+ private AuthenticationException authenticationException;
public TestableFetchRequestManager(LogContext logContext,
Time time,
@@ -3654,11 +3728,37 @@ public class FetchRequestManagerTest {
this.fetchCollector = fetchCollector;
}
+ public void setAuthenticationException(AuthenticationException
authenticationException) {
+ this.authenticationException = authenticationException;
+ }
+
+ @Override
+ protected boolean isUnavailable(Node node) {
+ if (authenticationException != null)
+ return true;
+
+ return super.isUnavailable(node);
+ }
+
+ @Override
+ protected void maybeThrowAuthFailure(Node node) {
+ if (authenticationException != null) {
+ AuthenticationException e = authenticationException;
+ authenticationException = null;
+ throw e;
+ }
+
+ super.maybeThrowAuthFailure(node);
+ }
+
private Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
- private int sendFetches() {
+ private int sendFetches(boolean requestFetch) {
+ if (requestFetch)
+ createFetchRequests();
+
NetworkClientDelegate.PollResult pollResult =
poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult.unsentRequests.size();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index e60a0a70059..27f3ae46002 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -128,6 +128,8 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() {
return Stream.of(
+ Arguments.of(new PollEvent(100)),
+ Arguments.of(new
CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),