This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new adcf10ca8b6 KAFKA-19297: Refactor AsyncKafkaConsumer's use of Java Streams APIs in critical sections (#19917) adcf10ca8b6 is described below commit adcf10ca8b6d6caef0dbe73308594039948aea83 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Wed Jun 18 07:00:45 2025 -0700 KAFKA-19297: Refactor AsyncKafkaConsumer's use of Java Streams APIs in critical sections (#19917) Profiling has shown that using the Collections Streams API approach adds unnecessary overhead compared to a traditional for loop. Minor revisions to the code have been made to use simpler constructs to improve performance. Reviewers: Lianet Magrans <lmagr...@confluent.io>, Andrew Schofield <aschofi...@confluent.io> --- .../clients/consumer/internals/AbstractFetch.java | 33 ++++-- .../internals/AbstractMembershipManager.java | 3 +- .../internals/ConsumerMembershipManager.java | 3 +- .../consumer/internals/ConsumerNetworkThread.java | 50 +++++---- .../ConsumerRebalanceListenerInvoker.java | 35 +++--- .../internals/TopicMetadataRequestManager.java | 32 ++++-- .../events/ApplicationEventProcessor.java | 2 +- .../internals/events/CompletableEventReaper.java | 119 ++++++++++++++------- 8 files changed, 180 insertions(+), 97 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java index 533cbbbaa98..9d96712a473 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java @@ -45,6 +45,7 @@ import org.slf4j.helpers.MessageFormatter; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -54,7 +55,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate; @@ -223,10 +223,15 @@ public abstract class AbstractFetch implements Closeable { } if (!partitionsWithUpdatedLeaderInfo.isEmpty()) { - List<Node> leaderNodes = response.data().nodeEndpoints().stream() - .map(e -> new Node(e.nodeId(), e.host(), e.port(), e.rack())) - .filter(e -> !e.equals(Node.noNode())) - .collect(Collectors.toList()); + List<Node> leaderNodes = new ArrayList<>(); + + for (FetchResponseData.NodeEndpoint e : response.data().nodeEndpoints()) { + Node node = new Node(e.nodeId(), e.host(), e.port(), e.rack()); + + if (!node.equals(Node.noNode())) + leaderNodes.add(node); + } + Set<TopicPartition> updatedPartitions = metadata.updatePartitionLeadership(partitionsWithUpdatedLeaderInfo, leaderNodes); updatedPartitions.forEach( tp -> { @@ -397,7 +402,7 @@ public abstract class AbstractFetch implements Closeable { fetchable.put(fetchTarget, sessionHandler.newBuilder()); }); - return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); + return convert(fetchable); } /** @@ -470,7 +475,21 @@ public abstract class AbstractFetch implements Closeable { } } - return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); + return convert(fetchable); + } + + /** + * This method converts {@link FetchSessionHandler.Builder} instances to + * {@link FetchSessionHandler.FetchRequestData} instances. It intentionally forgoes use of the Java Collections + * Streams API to reduce overhead in the critical network path. + */ + private Map<Node, FetchSessionHandler.FetchRequestData> convert(Map<Node, FetchSessionHandler.Builder> fetchable) { + Map<Node, FetchSessionHandler.FetchRequestData> map = new HashMap<>(fetchable.size()); + + for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) + map.put(entry.getKey(), entry.getValue().build()); + + return map; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index 524a0a4a8dd..74ecf7f9bb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -43,7 +43,6 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -1135,7 +1134,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl // Ensure the set of partitions to revoke are still assigned Set<TopicPartition> revokedPartitions = new HashSet<>(partitionsToRevoke); revokedPartitions.retainAll(subscriptions.assignedPartitions()); - log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("Revoking previously assigned partitions {}", revokedPartitions); signalPartitionsBeingRevoked(revokedPartitions); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java index b6159770754..e07424a6393 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java @@ -44,7 +44,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT; import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP; @@ -415,7 +414,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(partitionsToRevoke); if (!revokePausedPartitions.isEmpty()) { - log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 550f7c8258b..d2d178a88c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -35,13 +35,13 @@ import org.slf4j.Logger; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -144,6 +144,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { * </ol> */ void runOnce() { + // The following code avoids use of the Java Collections Streams API to reduce overhead in this loop. processApplicationEvents(); final long currentTimeMs = time.milliseconds(); @@ -152,19 +153,24 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { } lastPollTimeMs = currentTimeMs; - final long pollWaitTimeMs = requestManagers.entries().stream() - .map(rm -> rm.poll(currentTimeMs)) - .mapToLong(networkClientDelegate::addAll) - .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS) - .min() - .orElse(MAX_POLL_TIMEOUT_MS); + long pollWaitTimeMs = MAX_POLL_TIMEOUT_MS; + + for (RequestManager rm : requestManagers.entries()) { + NetworkClientDelegate.PollResult pollResult = rm.poll(currentTimeMs); + long timeoutMs = networkClientDelegate.addAll(pollResult); + pollWaitTimeMs = Math.min(pollWaitTimeMs, timeoutMs); + } networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); - cachedMaximumTimeToWait = requestManagers.entries().stream() - .mapToLong(rm -> rm.maximumTimeToWait(currentTimeMs)) - .min() - .orElse(Long.MAX_VALUE); + long maxTimeToWaitMs = Long.MAX_VALUE; + + for (RequestManager rm : requestManagers.entries()) { + long waitMs = rm.maximumTimeToWait(currentTimeMs); + maxTimeToWaitMs = Math.min(maxTimeToWaitMs, waitMs); + } + + cachedMaximumTimeToWait = maxTimeToWaitMs; reapExpiredApplicationEvents(currentTimeMs); List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents(); @@ -235,10 +241,11 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { static void runAtClose(final Collection<RequestManager> requestManagers, final NetworkClientDelegate networkClientDelegate, final long currentTimeMs) { - // These are the optional outgoing requests at the - requestManagers.stream() - .map(rm -> rm.pollOnClose(currentTimeMs)) - .forEach(networkClientDelegate::addAll); + // These are the optional outgoing requests at the time of closing the consumer + for (RequestManager rm : requestManagers) { + NetworkClientDelegate.PollResult pollResult = rm.pollOnClose(currentTimeMs); + networkClientDelegate.addAll(pollResult); + } } public boolean isRunning() { @@ -362,12 +369,13 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { * If there is a metadata error, complete all uncompleted events that require subscription metadata. */ private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { - List<? extends CompletableApplicationEvent<?>> subscriptionMetadataEvent = events.stream() - .filter(e -> e instanceof CompletableApplicationEvent<?>) - .map(e -> (CompletableApplicationEvent<?>) e) - .filter(CompletableApplicationEvent::requireSubscriptionMetadata) - .collect(Collectors.toList()); - + List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>(); + + for (CompletableEvent<?> ce : events) { + if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) + subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce); + } + if (subscriptionMetadataEvent.isEmpty()) return; networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java index b42cf85a860..3f66b6ce3c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import java.util.Optional; import java.util.Set; import java.util.SortedSet; -import java.util.stream.Collectors; /** * This class encapsulates the invocation of the callback methods defined in the {@link ConsumerRebalanceListener} @@ -55,7 +54,7 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedPartitions) { - log.info("Adding newly assigned partitions: {}", assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("Adding newly assigned partitions: {}", assignedPartitions); Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); @@ -67,8 +66,12 @@ public class ConsumerRebalanceListenerInvoker { } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { - log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", - listener.get().getClass().getName(), assignedPartitions, e); + log.error( + "User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", + listener.get().getClass().getName(), + assignedPartitions, + e + ); return e; } } @@ -77,11 +80,11 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) { - log.info("Revoke previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("Revoke previously assigned partitions {}", revokedPartitions); Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(revokedPartitions); if (!revokePausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions); Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); @@ -93,8 +96,12 @@ public class ConsumerRebalanceListenerInvoker { } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { - log.error("User provided listener {} failed on invocation of onPartitionsRevoked for partitions {}", - listener.get().getClass().getName(), revokedPartitions, e); + log.error( + "User provided listener {} failed on invocation of onPartitionsRevoked for partitions {}", + listener.get().getClass().getName(), + revokedPartitions, + e + ); return e; } } @@ -103,11 +110,11 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsLost(final SortedSet<TopicPartition> lostPartitions) { - log.info("Lost previously assigned partitions {}", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("Lost previously assigned partitions {}", lostPartitions); Set<TopicPartition> lostPausedPartitions = subscriptions.pausedPartitions(); lostPausedPartitions.retainAll(lostPartitions); if (!lostPausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("The pause flag in partitions {} will be removed due to partition lost.", lostPartitions); Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); @@ -119,8 +126,12 @@ public class ConsumerRebalanceListenerInvoker { } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { - log.error("User provided listener {} failed on invocation of onPartitionsLost for partitions {}", - listener.get().getClass().getName(), lostPartitions, e); + log.error( + "User provided listener {} failed on invocation of onPartitionsLost for partitions {}", + listener.get().getClass().getName(), + lostPartitions, + e + ); return e; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java index 2d9cab0dd96..fcef3ce2647 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java @@ -33,15 +33,16 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY; @@ -84,16 +85,23 @@ public class TopicMetadataRequestManager implements RequestManager { @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { // Prune any requests which have timed out - List<TopicMetadataRequestState> expiredRequests = inflightRequests.stream() - .filter(TimedRequestState::isExpired) - .collect(Collectors.toList()); - expiredRequests.forEach(TopicMetadataRequestState::expire); + Iterator<TopicMetadataRequestState> requestStateIterator = inflightRequests.iterator(); - List<NetworkClientDelegate.UnsentRequest> requests = inflightRequests.stream() - .map(req -> req.send(currentTimeMs)) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); + while (requestStateIterator.hasNext()) { + TopicMetadataRequestState requestState = requestStateIterator.next(); + + if (requestState.isExpired()) { + requestState.expire(); + requestStateIterator.remove(); + } + } + + List<NetworkClientDelegate.UnsentRequest> requests = new ArrayList<>(); + + for (TopicMetadataRequestState request : inflightRequests) { + Optional<NetworkClientDelegate.UnsentRequest> unsentRequest = request.send(currentTimeMs); + unsentRequest.ifPresent(requests::add); + } return requests.isEmpty() ? EMPTY : new NetworkClientDelegate.PollResult(0, requests); } @@ -181,7 +189,9 @@ public class TopicMetadataRequestManager implements RequestManager { } private void expire() { - completeFutureAndRemoveRequest( + // The request state is removed from inflightRequests via an iterator by the caller of this method, + // so don't remove it from inflightRequests here. + future.completeExceptionally( new TimeoutException("Timeout expired while fetching topic metadata")); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 8d6564083ff..853c5484df5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -310,7 +310,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven manager.updateTimerAndMaybeCommit(event.currentTimeMs()); } - log.info("Assigned to partition(s): {}", event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + log.info("Assigned to partition(s): {}", event.partitions()); try { if (subscriptions.assignFromUser(new HashSet<>(event.partitions()))) metadata.requestUpdateForNewTopics(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java index 5a0358df896..b4440de0626 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java @@ -25,11 +25,10 @@ import org.slf4j.Logger; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.stream.Collectors; /** * {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing @@ -85,26 +84,39 @@ public class CompletableEventReaper { * @return The number of events that were expired */ public long reap(long currentTimeMs) { - Consumer<CompletableEvent<?>> expireEvent = event -> { - long pastDueMs = currentTimeMs - event.deadlineMs(); - TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, event.deadlineMs())); + int count = 0; + Iterator<CompletableEvent<?>> iterator = tracked.iterator(); + + while (iterator.hasNext()) { + CompletableEvent<?> event = iterator.next(); + + if (event.future().isDone()) { + // Remove any events that are already complete. + iterator.remove(); + continue; + } + + long deadlineMs = event.deadlineMs(); + long pastDueMs = currentTimeMs - deadlineMs; + + if (pastDueMs < 0) + continue; + + TimeoutException error = new TimeoutException(String.format("%s was %s ms past its expiration of %s", event.getClass().getSimpleName(), pastDueMs, deadlineMs)); + + // Complete (exceptionally) any events that have passed their deadline AND aren't already complete. if (event.future().completeExceptionally(error)) { - log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, event.deadlineMs(), pastDueMs); + log.debug("Event {} completed exceptionally since its expiration of {} passed {} ms ago", event, deadlineMs, pastDueMs); } else { log.trace("Event {} not completed exceptionally since it was previously completed", event); } - }; - - // First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. - long count = tracked.stream() - .filter(e -> !e.future().isDone()) - .filter(e -> currentTimeMs >= e.deadlineMs()) - .peek(expireEvent) - .count(); - // Second, remove any events that are already complete, just to make sure we don't hold references. This will - // include any events that finished successfully as well as any events we just completed exceptionally above. - tracked.removeIf(e -> e.future().isDone()); + + count++; + + // Remove the events so that we don't hold a reference to it. + iterator.remove(); + } return count; } @@ -131,29 +143,12 @@ public class CompletableEventReaper { public long reap(Collection<?> events) { Objects.requireNonNull(events, "Event queue to reap must be non-null"); - Consumer<CompletableEvent<?>> expireEvent = event -> { - TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName())); - - if (event.future().completeExceptionally(error)) { - log.debug("Event {} completed exceptionally since the consumer is closing", event); - } else { - log.trace("Event {} not completed exceptionally since it was completed prior to the consumer closing", event); - } - }; - - long trackedExpiredCount = tracked.stream() - .filter(e -> !e.future().isDone()) - .peek(expireEvent) - .count(); + long trackedExpiredCount = completeEventsExceptionallyOnClose(tracked); tracked.clear(); - long eventExpiredCount = events.stream() - .filter(e -> e instanceof CompletableEvent<?>) - .map(e -> (CompletableEvent<?>) e) - .filter(e -> !e.future().isDone()) - .peek(expireEvent) - .count(); + long eventExpiredCount = completeEventsExceptionallyOnClose(events); events.clear(); + return trackedExpiredCount + eventExpiredCount; } @@ -166,9 +161,51 @@ public class CompletableEventReaper { } public List<CompletableEvent<?>> uncompletedEvents() { - return tracked.stream() - .filter(e -> !e.future().isDone()) - .collect(Collectors.toList()); + // The following code does not use the Java Collections Streams API to reduce overhead in the critical + // path of the ConsumerNetworkThread loop. + List<CompletableEvent<?>> events = new ArrayList<>(); + + for (CompletableEvent<?> event : tracked) { + if (!event.future().isDone()) + events.add(event); + } + + return events; + } + + /** + * For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * @param events Collection of objects, assumed to be subclasses of {@link ApplicationEvent} or + * {@link BackgroundEvent}, but will only perform completion for any + * unfinished {@link CompletableEvent}s + * + * @return Number of events closed + */ + private long completeEventsExceptionallyOnClose(Collection<?> events) { + long count = 0; + + for (Object o : events) { + if (!(o instanceof CompletableEvent)) + continue; + + CompletableEvent<?> event = (CompletableEvent<?>) o; + + if (event.future().isDone()) + continue; + + count++; + + TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since the consumer is closing", event); + } else { + log.trace("Event {} not completed exceptionally since it was completed prior to the consumer closing", event); + } + } + + return count; } - }