junrao commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1350697908
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,163 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // be no following rebalance. + // + // See the ApplicationEventProcessor.process() method that handles this event for more detail. + applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); - if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + if (subscriptions.assignFromUser(new HashSet<>(partitions))) + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + subscriptions.subscribe(pattern, listener); + updatePatternSubscription(metadata.fetch()); + metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + * collect the results in {@link #fetchBuffer}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); Review Comment: The only useful part of this call is to wake up the ConsumerNetworkThread so that it could prefetch the next data chunk. Perhaps we could make that an explicit call like `applicationEventHandler.wakeup`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -17,146 +17,195 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; -public class ApplicationEventProcessor { - - private final BlockingQueue<BackgroundEvent> backgroundEventQueue; +/** + * An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread} + * which processes {@link ApplicationEvent application events} generated by the application thread. + */ +public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> { private final ConsumerMetadata metadata; - private final RequestManagers requestManagers; - public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue, + public ApplicationEventProcessor(final LogContext logContext, + final BlockingQueue<ApplicationEvent> applicationEventQueue, final RequestManagers requestManagers, final ConsumerMetadata metadata) { - this.backgroundEventQueue = backgroundEventQueue; + super(logContext, applicationEventQueue); this.requestManagers = requestManagers; this.metadata = metadata; } - public boolean process(final ApplicationEvent event) { - Objects.requireNonNull(event); + /** + * Process the events—if any—that were produced by the application thread. It is possible that when processing + * an event generates an error. In such cases, the processor will immediately throw an exception, and not + * process the remaining events. + */ + @Override + public void process() { + process(error -> { + throw error; + }); + } + + @Override + public void process(ApplicationEvent event) { switch (event.type()) { - case NOOP: - return process((NoopApplicationEvent) event); case COMMIT: - return process((CommitApplicationEvent) event); + process((CommitApplicationEvent) event); + return; + case POLL: - return process((PollApplicationEvent) event); + process((PollApplicationEvent) event); + return; + case FETCH_COMMITTED_OFFSET: - return process((OffsetFetchApplicationEvent) event); + process((OffsetFetchApplicationEvent) event); + return; + case METADATA_UPDATE: - return process((NewTopicsMetadataUpdateRequestEvent) event); + process((NewTopicsMetadataUpdateRequestEvent) event); + return; + case ASSIGNMENT_CHANGE: - return process((AssignmentChangeApplicationEvent) event); + process((AssignmentChangeApplicationEvent) event); + return; + case TOPIC_METADATA: - return process((TopicMetadataApplicationEvent) event); + process((TopicMetadataApplicationEvent) event); + return; + case LIST_OFFSETS: - return process((ListOffsetsApplicationEvent) event); + process((ListOffsetsApplicationEvent) event); + return; + + case FETCH: + process((FetchEvent) event); + return; + case RESET_POSITIONS: - return processResetPositionsEvent(); + processResetPositionsEvent(); + return; + case VALIDATE_POSITIONS: - return processValidatePositionsEvent(); + processValidatePositionsEvent(); + return; + + default: + throw new IllegalArgumentException("Application event type " + event.type() + " was not expected"); } - return false; } - /** - * Processes {@link NoopApplicationEvent} and enqueue a - * {@link NoopBackgroundEvent}. This is intentionally left here for - * demonstration purpose. - * - * @param event a {@link NoopApplicationEvent} - */ - private boolean process(final NoopApplicationEvent event) { - return backgroundEventQueue.add(new NoopBackgroundEvent(event.message())); + @Override + protected Class<ApplicationEvent> getEventClass() { + return ApplicationEvent.class; } - private boolean process(final PollApplicationEvent event) { + private void process(final PollApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { - return true; + return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); manager.updateAutoCommitTimer(event.pollTimeMs()); - return true; } - private boolean process(final CommitApplicationEvent event) { + private void process(final CommitApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { // Leaving this error handling here, but it is a bit strange as the commit API should enforce the group.id - // upfront so we should never get to this block. + // upfront, so we should never get to this block. Exception exception = new KafkaException("Unable to commit offset. Most likely because the group.id wasn't set"); event.future().completeExceptionally(exception); - return false; + return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); event.chain(manager.addOffsetCommitRequest(event.offsets())); - return true; } - private boolean process(final OffsetFetchApplicationEvent event) { + private void process(final OffsetFetchApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { event.future().completeExceptionally(new KafkaException("Unable to fetch committed " + "offset because the CommittedRequestManager is not available. Check if group.id was set correctly")); - return false; + return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); event.chain(manager.addOffsetFetchRequest(event.partitions())); - return true; } - private boolean process(final NewTopicsMetadataUpdateRequestEvent event) { + private void process(final NewTopicsMetadataUpdateRequestEvent ignored) { metadata.requestUpdateForNewTopics(); - return true; } - private boolean process(final AssignmentChangeApplicationEvent event) { + private void process(final AssignmentChangeApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { - return false; + return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); manager.updateAutoCommitTimer(event.currentTimeMs()); manager.maybeAutoCommit(event.offsets()); - return true; } - private boolean process(final ListOffsetsApplicationEvent event) { + private void process(final ListOffsetsApplicationEvent event) { final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future = requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), event.requireTimestamps()); event.chain(future); - return true; } - private boolean processResetPositionsEvent() { + private void processResetPositionsEvent() { requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - return true; } - private boolean processValidatePositionsEvent() { + private void processValidatePositionsEvent() { requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); - return true; } - private boolean process(final TopicMetadataApplicationEvent event) { + private void process(final TopicMetadataApplicationEvent event) { final CompletableFuture<Map<String, List<PartitionInfo>>> future = - this.requestManagers.topicMetadataRequestManager.requestTopicMetadata(Optional.of(event.topic())); + this.requestManagers.topicMetadataRequestManager.requestTopicMetadata(Optional.of(event.topic())); event.chain(future); - return true; + } + + private void process(final FetchEvent event) { Review Comment: Should we just remove this method? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ########## @@ -162,7 +165,9 @@ private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch) { throw new IllegalStateException("Missing position for fetchable partition " + tp); if (nextInLineFetch.nextFetchOffset() == position.offset) { - List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig, fetchConfig.maxPollRecords); + List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig, Review Comment: 1. This is an existing issue. But the way we handle paused partitions in `collectFetch` seems problematic. The application thread first calls `fetchBuffer.setNextInLineFetch(null)` and then calls `fetchBuffer.addAll(pausedCompletedFetches)`. This could leave a brief window where the paused partition is not included in either `nextInLineFetch` or `completedFetches`. If the background thread kicks in in that window, it could have fetched another chunk for that partition and added the response back to FetchBuffer. This would violate the assumption there is no more than one pending `CompletedFetch` per partition in FetchBuffer and could cause records returned not in offset order or duplicates to be returned. 2. The second existing issue is on the `fetchBuffer.setNextInLineFetch` call in `collectFetch`. The issue is that after all records are drained from `nextInLineFetch`. We only call `setNextInLineFetch` when there is a new `completedFetch`. However, until the drained `completedFetch` is removed from `nextInLineFetch`, the background thread can't fetch the next chunk. So, it seems that we will just be stuck here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ########## @@ -66,31 +82,107 @@ boolean isEmpty() { * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise */ boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) { - return completedFetches.stream().anyMatch(predicate); + try { + lock.lock(); + return completedFetches.stream().anyMatch(predicate); + } finally { + lock.unlock(); + } } void add(CompletedFetch completedFetch) { - completedFetches.add(completedFetch); + try { + lock.lock(); + completedFetches.add(completedFetch); + notEmptyCondition.signalAll(); + } finally { + lock.unlock(); + } } void addAll(Collection<CompletedFetch> completedFetches) { - this.completedFetches.addAll(completedFetches); + if (completedFetches == null || completedFetches.isEmpty()) + return; + + try { + lock.lock(); + this.completedFetches.addAll(completedFetches); + notEmptyCondition.signalAll(); + } finally { + lock.unlock(); + } } CompletedFetch nextInLineFetch() { - return nextInLineFetch; + try { + lock.lock(); + return nextInLineFetch; + } finally { + lock.unlock(); + } } - void setNextInLineFetch(CompletedFetch completedFetch) { - this.nextInLineFetch = completedFetch; + void setNextInLineFetch(CompletedFetch nextInLineFetch) { + try { + lock.lock(); + this.nextInLineFetch = nextInLineFetch; + } finally { + lock.unlock(); + } } CompletedFetch peek() { - return completedFetches.peek(); + try { + lock.lock(); + return completedFetches.peek(); + } finally { + lock.unlock(); + } } CompletedFetch poll() { - return completedFetches.poll(); + try { + lock.lock(); + return completedFetches.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Allows the caller to await presence of data in the buffer. The method will block, returning only + * under one of the following conditions: + * + * <ol> + * <li>The buffer was already non-empty on entry</li> + * <li>The buffer was populated during the wait</li> + * <li>The remaining time on the {@link Timer timer} elapsed</li> + * <li>The thread was interrupted</li> + * </ol> + * + * @param timer Timer that provides time to wait + */ + void awaitNotEmpty(Timer timer) { + try { + lock.lock(); + + while (isEmpty()) { + // Update the timer before we head into the loop in case it took a while to get the lock. + timer.update(); + + if (timer.isExpired()) + break; + + if (notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) { Review Comment: `notEmptyCondition.await` returns false if the waiting time detectably elapsed before return from the method. This seems to be case to break out of the while loop. So, it seems that the `if` test should be reversed. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,163 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // be no following rebalance. + // + // See the ApplicationEventProcessor.process() method that handles this event for more detail. + applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); - if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + if (subscriptions.assignFromUser(new HashSet<>(partitions))) + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + subscriptions.subscribe(pattern, listener); + updatePatternSubscription(metadata.fetch()); + metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + * collect the results in {@link #fetchBuffer}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); + } + + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we don't have any waiting data here; it's a 'best case' effort. The Review Comment: The comment seems obsolete since we are not fetching data below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ########## @@ -35,19 +41,24 @@ * * <p/> * - * <em>Note</em>: this class is not thread-safe and is intended to only be used from a single thread. + * <em>Note</em>: this class is thread-safe with the intention that {@link CompletedFetch the data} will be Review Comment: Currently, `fetchBuffer.setNextInLineFetch` and `fetchBuffer.poll` are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,163 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // be no following rebalance. + // + // See the ApplicationEventProcessor.process() method that handles this event for more detail. + applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); - if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + if (subscriptions.assignFromUser(new HashSet<>(partitions))) + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + subscriptions.subscribe(pattern, listener); + updatePatternSubscription(metadata.fetch()); + metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + * collect the results in {@link #fetchBuffer}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); + } + + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); Review Comment: This call seems unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org