This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3707fc815c KAFKA-19214: Clean up use of Optionals in 
RequestManagers.entries() (#19609)
d3707fc815c is described below

commit d3707fc815cc89b065c255ef87269d9c75d4b4cd
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Wed May 7 09:18:12 2025 -0700

    KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() (#19609)
    
    Change:
    
    `public List<Optional<? extends RequestManager>> entries();`
    
    to:
    
    `public List<RequestManager> entries();`
    
    and clean up the callers.
    
    Reviewers: TengYao Chi <kiting...@gmail.com>, Andrew Schofield
     <aschofi...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../consumer/internals/ConsumerNetworkThread.java  | 21 ++++++-------
 .../consumer/internals/RequestManagers.java        | 36 ++++++++++------------
 .../internals/ConsumerNetworkThreadTest.java       | 19 +++---------
 .../internals/FetchRequestManagerTest.java         |  2 +-
 4 files changed, 32 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index a48289919b0..de146f29e82 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -39,7 +39,6 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -154,18 +153,18 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
         lastPollTimeMs = currentTimeMs;
 
         final long pollWaitTimeMs = requestManagers.entries().stream()
-                .filter(Optional::isPresent)
-                .map(Optional::get)
                 .map(rm -> rm.poll(currentTimeMs))
-                .map(networkClientDelegate::addAll)
-                .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
+                .mapToLong(networkClientDelegate::addAll)
+                .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS)
+                .min()
+                .orElse(MAX_POLL_TIMEOUT_MS);
+
         networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
 
         cachedMaximumTimeToWait = requestManagers.entries().stream()
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .map(rm -> rm.maximumTimeToWait(currentTimeMs))
-                .reduce(Long.MAX_VALUE, Math::min);
+                .mapToLong(rm -> rm.maximumTimeToWait(currentTimeMs))
+                .min()
+                .orElse(Long.MAX_VALUE);
 
         reapExpiredApplicationEvents(currentTimeMs);
         List<CompletableEvent<?>> uncompletedEvents = 
applicationEventReaper.uncompletedEvents();
@@ -233,13 +232,11 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
      * </ol>
      */
     // Visible for testing
-    static void runAtClose(final Collection<Optional<? extends 
RequestManager>> requestManagers,
+    static void runAtClose(final Collection<RequestManager> requestManagers,
                            final NetworkClientDelegate networkClientDelegate,
                            final long currentTimeMs) {
         // These are the optional outgoing requests at the
         requestManagers.stream()
-                .filter(Optional::isPresent)
-                .map(Optional::get)
                 .map(rm -> rm.pollOnClose(currentTimeMs))
                 .forEach(networkClientDelegate::addAll);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index cab7d804cad..f341dc35a4a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -60,7 +60,7 @@ public class RequestManagers implements Closeable {
     public final FetchRequestManager fetchRequestManager;
     public final Optional<ShareConsumeRequestManager> 
shareConsumeRequestManager;
     public final Optional<StreamsGroupHeartbeatRequestManager> 
streamsGroupHeartbeatRequestManager;
-    private final List<Optional<? extends RequestManager>> entries;
+    private final List<RequestManager> entries;
     private final IdempotentCloser closer = new IdempotentCloser();
 
     public RequestManagers(LogContext logContext,
@@ -87,16 +87,16 @@ public class RequestManagers implements Closeable {
         this.streamsMembershipManager = streamsMembershipManager;
         this.shareMembershipManager = Optional.empty();
 
-        List<Optional<? extends RequestManager>> list = new ArrayList<>();
-        list.add(coordinatorRequestManager);
-        list.add(commitRequestManager);
-        list.add(heartbeatRequestManager);
-        list.add(membershipManager);
-        list.add(streamsGroupHeartbeatRequestManager);
-        list.add(streamsMembershipManager);
-        list.add(Optional.of(offsetsRequestManager));
-        list.add(Optional.of(topicMetadataRequestManager));
-        list.add(Optional.of(fetchRequestManager));
+        List<RequestManager> list = new ArrayList<>();
+        coordinatorRequestManager.ifPresent(list::add);
+        commitRequestManager.ifPresent(list::add);
+        heartbeatRequestManager.ifPresent(list::add);
+        membershipManager.ifPresent(list::add);
+        streamsGroupHeartbeatRequestManager.ifPresent(list::add);
+        streamsMembershipManager.ifPresent(list::add);
+        list.add(offsetsRequestManager);
+        list.add(topicMetadataRequestManager);
+        list.add(fetchRequestManager);
         entries = Collections.unmodifiableList(list);
     }
 
@@ -119,15 +119,15 @@ public class RequestManagers implements Closeable {
         this.topicMetadataRequestManager = null;
         this.fetchRequestManager = null;
 
-        List<Optional<? extends RequestManager>> list = new ArrayList<>();
-        list.add(coordinatorRequestManager);
-        list.add(shareHeartbeatRequestManager);
-        list.add(shareMembershipManager);
-        list.add(Optional.of(shareConsumeRequestManager));
+        List<RequestManager> list = new ArrayList<>();
+        coordinatorRequestManager.ifPresent(list::add);
+        shareHeartbeatRequestManager.ifPresent(list::add);
+        shareMembershipManager.ifPresent(list::add);
+        list.add(shareConsumeRequestManager);
         entries = Collections.unmodifiableList(list);
     }
 
-    public List<Optional<? extends RequestManager>> entries() {
+    public List<RequestManager> entries() {
         return entries;
     }
 
@@ -138,8 +138,6 @@ public class RequestManagers implements Closeable {
                     log.debug("Closing RequestManagers");
 
                     entries.stream()
-                            .filter(Optional::isPresent)
-                            .map(Optional::get)
                             .filter(rm -> rm instanceof Closeable)
                             .map(rm -> (Closeable) rm)
                             .forEach(c -> closeQuietly(c, 
c.getClass().getSimpleName()));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 520279fc8d4..de0653b616f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -35,11 +35,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -117,10 +114,7 @@ public class ConsumerNetworkThreadTest {
     @ParameterizedTest
     @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1, 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
     public void testConsumerNetworkThreadPollTimeComputations(long 
exampleTime) {
-        List<Optional<? extends RequestManager>> list = new ArrayList<>();
-        list.add(Optional.of(coordinatorRequestManager));
-        list.add(Optional.of(heartbeatRequestManager));
-
+        List<RequestManager> list = List.of(coordinatorRequestManager, 
heartbeatRequestManager);
         when(requestManagers.entries()).thenReturn(list);
 
         NetworkClientDelegate.PollResult pollResult = new 
NetworkClientDelegate.PollResult(exampleTime);
@@ -158,16 +152,13 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testRequestsTransferFromManagersToClientOnThreadRun() {
-        List<Optional<? extends RequestManager>> list = new ArrayList<>();
-        list.add(Optional.of(coordinatorRequestManager));
-        list.add(Optional.of(heartbeatRequestManager));
-        list.add(Optional.of(offsetsRequestManager));
+        List<RequestManager> list = List.of(coordinatorRequestManager, 
heartbeatRequestManager, offsetsRequestManager);
 
         when(requestManagers.entries()).thenReturn(list);
         
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
         consumerNetworkThread.runOnce();
-        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong())));
-        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong())));
+        requestManagers.entries().forEach(rm -> verify(rm).poll(anyLong()));
+        requestManagers.entries().forEach(rm -> 
verify(rm).maximumTimeToWait(anyLong()));
         
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
         verify(networkClientDelegate).poll(anyLong(), anyLong());
     }
@@ -178,7 +169,7 @@ public class ConsumerNetworkThreadTest {
         // Initial value before runOnce has been called
         assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
 
-        
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+        
when(requestManagers.entries()).thenReturn(List.of(heartbeatRequestManager));
         
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
         consumerNetworkThread.runOnce();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index e25bccc1892..7c8547ddd88 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -382,7 +382,7 @@ public class FetchRequestManagerTest {
         // NOTE: by design the FetchRequestManager doesn't perform network I/O 
internally. That means that calling
         // the close() method with a Timer will NOT send out the close session 
requests on close. The network
         // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we 
need to run that logic here.
-        ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, time.milliseconds());
+        ConsumerNetworkThread.runAtClose(List.of(fetcher), 
networkClientDelegate, time.milliseconds());
         // the network is polled during the last state of clean up.
         networkClientDelegate.poll(time.timer(1));
         // validate that closing the fetcher has sent a request with final 
epoch. 2 requests are sent, one for the

Reply via email to