This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d3707fc815c KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() (#19609) d3707fc815c is described below commit d3707fc815cc89b065c255ef87269d9c75d4b4cd Author: Kirk True <k...@kirktrue.pro> AuthorDate: Wed May 7 09:18:12 2025 -0700 KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() (#19609) Change: `public List<Optional<? extends RequestManager>> entries();` to: `public List<RequestManager> entries();` and clean up the callers. Reviewers: TengYao Chi <kiting...@gmail.com>, Andrew Schofield <aschofi...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com> --- .../consumer/internals/ConsumerNetworkThread.java | 21 ++++++------- .../consumer/internals/RequestManagers.java | 36 ++++++++++------------ .../internals/ConsumerNetworkThreadTest.java | 19 +++--------- .../internals/FetchRequestManagerTest.java | 2 +- 4 files changed, 32 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index a48289919b0..de146f29e82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -39,7 +39,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -154,18 +153,18 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { lastPollTimeMs = currentTimeMs; final long pollWaitTimeMs = requestManagers.entries().stream() - .filter(Optional::isPresent) - .map(Optional::get) .map(rm -> rm.poll(currentTimeMs)) - .map(networkClientDelegate::addAll) - .reduce(MAX_POLL_TIMEOUT_MS, Math::min); + .mapToLong(networkClientDelegate::addAll) + .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS) + .min() + .orElse(MAX_POLL_TIMEOUT_MS); + networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); cachedMaximumTimeToWait = requestManagers.entries().stream() - .filter(Optional::isPresent) - .map(Optional::get) - .map(rm -> rm.maximumTimeToWait(currentTimeMs)) - .reduce(Long.MAX_VALUE, Math::min); + .mapToLong(rm -> rm.maximumTimeToWait(currentTimeMs)) + .min() + .orElse(Long.MAX_VALUE); reapExpiredApplicationEvents(currentTimeMs); List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents(); @@ -233,13 +232,11 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { * </ol> */ // Visible for testing - static void runAtClose(final Collection<Optional<? extends RequestManager>> requestManagers, + static void runAtClose(final Collection<RequestManager> requestManagers, final NetworkClientDelegate networkClientDelegate, final long currentTimeMs) { // These are the optional outgoing requests at the requestManagers.stream() - .filter(Optional::isPresent) - .map(Optional::get) .map(rm -> rm.pollOnClose(currentTimeMs)) .forEach(networkClientDelegate::addAll); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index cab7d804cad..f341dc35a4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -60,7 +60,7 @@ public class RequestManagers implements Closeable { public final FetchRequestManager fetchRequestManager; public final Optional<ShareConsumeRequestManager> shareConsumeRequestManager; public final Optional<StreamsGroupHeartbeatRequestManager> streamsGroupHeartbeatRequestManager; - private final List<Optional<? extends RequestManager>> entries; + private final List<RequestManager> entries; private final IdempotentCloser closer = new IdempotentCloser(); public RequestManagers(LogContext logContext, @@ -87,16 +87,16 @@ public class RequestManagers implements Closeable { this.streamsMembershipManager = streamsMembershipManager; this.shareMembershipManager = Optional.empty(); - List<Optional<? extends RequestManager>> list = new ArrayList<>(); - list.add(coordinatorRequestManager); - list.add(commitRequestManager); - list.add(heartbeatRequestManager); - list.add(membershipManager); - list.add(streamsGroupHeartbeatRequestManager); - list.add(streamsMembershipManager); - list.add(Optional.of(offsetsRequestManager)); - list.add(Optional.of(topicMetadataRequestManager)); - list.add(Optional.of(fetchRequestManager)); + List<RequestManager> list = new ArrayList<>(); + coordinatorRequestManager.ifPresent(list::add); + commitRequestManager.ifPresent(list::add); + heartbeatRequestManager.ifPresent(list::add); + membershipManager.ifPresent(list::add); + streamsGroupHeartbeatRequestManager.ifPresent(list::add); + streamsMembershipManager.ifPresent(list::add); + list.add(offsetsRequestManager); + list.add(topicMetadataRequestManager); + list.add(fetchRequestManager); entries = Collections.unmodifiableList(list); } @@ -119,15 +119,15 @@ public class RequestManagers implements Closeable { this.topicMetadataRequestManager = null; this.fetchRequestManager = null; - List<Optional<? extends RequestManager>> list = new ArrayList<>(); - list.add(coordinatorRequestManager); - list.add(shareHeartbeatRequestManager); - list.add(shareMembershipManager); - list.add(Optional.of(shareConsumeRequestManager)); + List<RequestManager> list = new ArrayList<>(); + coordinatorRequestManager.ifPresent(list::add); + shareHeartbeatRequestManager.ifPresent(list::add); + shareMembershipManager.ifPresent(list::add); + list.add(shareConsumeRequestManager); entries = Collections.unmodifiableList(list); } - public List<Optional<? extends RequestManager>> entries() { + public List<RequestManager> entries() { return entries; } @@ -138,8 +138,6 @@ public class RequestManagers implements Closeable { log.debug("Closing RequestManagers"); entries.stream() - .filter(Optional::isPresent) - .map(Optional::get) .filter(rm -> rm instanceof Closeable) .map(rm -> (Closeable) rm) .forEach(c -> closeQuietly(c, c.getClass().getSimpleName())); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 520279fc8d4..de0653b616f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -35,11 +35,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -117,10 +114,7 @@ public class ConsumerNetworkThreadTest { @ParameterizedTest @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1, ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1}) public void testConsumerNetworkThreadPollTimeComputations(long exampleTime) { - List<Optional<? extends RequestManager>> list = new ArrayList<>(); - list.add(Optional.of(coordinatorRequestManager)); - list.add(Optional.of(heartbeatRequestManager)); - + List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager); when(requestManagers.entries()).thenReturn(list); NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(exampleTime); @@ -158,16 +152,13 @@ public class ConsumerNetworkThreadTest { @Test public void testRequestsTransferFromManagersToClientOnThreadRun() { - List<Optional<? extends RequestManager>> list = new ArrayList<>(); - list.add(Optional.of(coordinatorRequestManager)); - list.add(Optional.of(heartbeatRequestManager)); - list.add(Optional.of(offsetsRequestManager)); + List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager, offsetsRequestManager); when(requestManagers.entries()).thenReturn(list); when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); - requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong()))); - requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong()))); + requestManagers.entries().forEach(rm -> verify(rm).poll(anyLong())); + requestManagers.entries().forEach(rm -> verify(rm).maximumTimeToWait(anyLong())); verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); verify(networkClientDelegate).poll(anyLong(), anyLong()); } @@ -178,7 +169,7 @@ public class ConsumerNetworkThreadTest { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); - when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(requestManagers.entries()).thenReturn(List.of(heartbeatRequestManager)); when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) defaultHeartbeatIntervalMs); consumerNetworkThread.runOnce(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e25bccc1892..7c8547ddd88 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -382,7 +382,7 @@ public class FetchRequestManagerTest { // NOTE: by design the FetchRequestManager doesn't perform network I/O internally. That means that calling // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. - ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, time.milliseconds()); + ConsumerNetworkThread.runAtClose(List.of(fetcher), networkClientDelegate, time.milliseconds()); // the network is polled during the last state of clean up. networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the