junrao commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1333393941
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we time out here; it's a best case effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + Queue<CompletedFetch> completedFetches = eventHandler.addAndGet(new FetchEvent(), pollTimer); + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + } catch (TimeoutException e) { + log.trace("Timeout during fetch", e); + } finally { + timer.update(pollTimer.currentTimeMs()); + } - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + return fetchCollector.collectFetch(fetchBuffer); + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + * @return true iff the operation completed without timing out + */ + private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + eventHandler.add(new ValidatePositionsApplicationEvent()); Review Comment: Just to understand this. If the consumer gets a FENCED_LEADER_EPOCH, a metadata update is triggered. Once the new metadata is received, the ValidatePositionsApplicationEvent will trigger the OffsetForLeaderEpoch request to validate the fetch position. Until the new metadata is received, the consumer will just continuously fetch from the old leader and receiving the same FENCED_LEADER_EPOCH error? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we time out here; it's a best case effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + Queue<CompletedFetch> completedFetches = eventHandler.addAndGet(new FetchEvent(), pollTimer); Review Comment: I guess the purpose of this code is when there is no pending records, we block until some new records are fetched. However, I am wondering if it achieves the purpose. When processing a FetchEvent, ApplicationProcessor just calls requestManagers.fetchRequestManager.drain(). If there is nothing to drain, an empty Queue is returned immediately. This will unblock FetchEvent to return immediately without waiting for the poll time? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -596,12 +786,38 @@ public Set<String> subscription() { @Override public void subscribe(Collection<String> topics) { - throw new KafkaException("method not implemented"); + subscribe(topics, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + maybeThrowInvalidGroupIdException(); + if (topics == null) + throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); + if (topics.isEmpty()) { + // treat subscribing to empty topic list as the same as unsubscribing + this.unsubscribe(); + } else { + for (String topic : topics) { + if (isBlank(topic)) + throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); + } + + throwIfNoAssignorsConfigured(); + + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { Review Comment: We haven't implemented the group subscription logic in PrototypeAsyncConsumer, right? Ditto for the pattern subscribe below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ########## @@ -249,7 +250,9 @@ private <K, V> Record nextFetchedRecord(FetchConfig<K, V> fetchConfig) { * @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords} * @return {@link ConsumerRecord Consumer records} */ - <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig<K, V> fetchConfig, int maxRecords) { + <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig fetchConfig, + Deserializers<K, V> deserializers, Review Comment: Could we update the javadoc? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception Review Comment: Is the rebalance callback called here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorEventHandler; + private final NetworkClientDelegate networkClientDelegate; + + FetchRequestManager(final LogContext logContext, + final Time time, + final ErrorEventHandler errorEventHandler, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final FetchMetricsManager metricsManager, + final NetworkClientDelegate networkClientDelegate) { + super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); + this.log = logContext.logger(FetchRequestManager.class); + this.errorEventHandler = errorEventHandler; + this.networkClientDelegate = networkClientDelegate; + } + + @Override + protected boolean isUnavailable(Node node) { + return networkClientDelegate.isUnavailable(node); + } + + @Override + protected void maybeThrowAuthFailure(Node node) { + networkClientDelegate.maybeThrowAuthFailure(node); + } + + @Override + public PollResult poll(long currentTimeMs) { + List<UnsentRequest> requests; + + if (!idempotentCloser.isClosed()) { + // If the fetcher is open (i.e. not closed), we will issue the normal fetch requests + requests = prepareFetchRequests().entrySet().stream().map(entry -> { + final Node fetchTarget = entry.getKey(); + final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, t) -> { + if (t != null) { + handleFetchResponse(fetchTarget, t); + log.warn("Attempt to fetch data from node {} failed due to fatal exception", fetchTarget, t); + errorEventHandler.handle(t); + } else { + handleFetchResponse(fetchTarget, data, clientResponse); + } + }; + + return new UnsentRequest(request, fetchTarget, responseHandler); + }).collect(Collectors.toList()); + } else { + requests = prepareCloseFetchSessionRequests().entrySet().stream().map(entry -> { + final Node fetchTarget = entry.getKey(); + final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, t) -> { + if (t != null) { + handleCloseFetchSessionResponse(fetchTarget, data, t); + log.warn("Attempt to close fetch session on node {} failed due to fatal exception", fetchTarget, t); + errorEventHandler.handle(t); + } else { + handleCloseFetchSessionResponse(fetchTarget, data); + } + }; + + return new UnsentRequest(request, fetchTarget, responseHandler); Review Comment: Is this request guaranteed to be sent when the consumer is closed? Do we need this guarantee? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.LinkedList; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +public class BackgroundEventProcessor { Review Comment: Could we describe what this class does? It's a bit weird the `BackgroundEventProcessor.process` is called from `PrototypeAsyncConsumer` in the foreground. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorEventHandler; + private final NetworkClientDelegate networkClientDelegate; + + FetchRequestManager(final LogContext logContext, + final Time time, + final ErrorEventHandler errorEventHandler, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final FetchMetricsManager metricsManager, + final NetworkClientDelegate networkClientDelegate) { + super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); + this.log = logContext.logger(FetchRequestManager.class); + this.errorEventHandler = errorEventHandler; + this.networkClientDelegate = networkClientDelegate; + } + + @Override + protected boolean isUnavailable(Node node) { + return networkClientDelegate.isUnavailable(node); + } + + @Override + protected void maybeThrowAuthFailure(Node node) { + networkClientDelegate.maybeThrowAuthFailure(node); + } + + @Override + public PollResult poll(long currentTimeMs) { + List<UnsentRequest> requests; + + if (!idempotentCloser.isClosed()) { + // If the fetcher is open (i.e. not closed), we will issue the normal fetch requests + requests = prepareFetchRequests().entrySet().stream().map(entry -> { + final Node fetchTarget = entry.getKey(); + final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, t) -> { + if (t != null) { + handleFetchResponse(fetchTarget, t); + log.warn("Attempt to fetch data from node {} failed due to fatal exception", fetchTarget, t); + errorEventHandler.handle(t); Review Comment: This causes an exception to be thrown in the application thread. It seems that we should avoid doing that for at least the retriable exceptions? Ditto below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); Review Comment: This logic is a bit weird. ApplicationEventProcessor handles FetchEvent by draining all completeFetches from fetchBuffer, but here we are just adding them back to fetchBuffer again. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we time out here; it's a best case effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + Queue<CompletedFetch> completedFetches = eventHandler.addAndGet(new FetchEvent(), pollTimer); + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + } catch (TimeoutException e) { + log.trace("Timeout during fetch", e); + } finally { + timer.update(pollTimer.currentTimeMs()); + } - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + return fetchCollector.collectFetch(fetchBuffer); + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + * @return true iff the operation completed without timing out + */ + private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + eventHandler.add(new ValidatePositionsApplicationEvent()); + + cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHasAllFetchPositions) return true; + + // If there are any partitions which do not have a valid position and are not + // awaiting reset, then we need to fetch committed offsets. We will only do a + // coordinator lookup if there are partitions which have missing positions, so + // a consumer with manually assigned partitions can avoid a coordinator dependence + // by always ensuring that assigned partitions have an initial position. + if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) + return false; + + // If there are partitions still needing a position and a reset policy is defined, + // request reset using the default policy. If no reset strategy is defined and there + // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. + subscriptions.resetInitializingPositions(); + + // Finally send an asynchronous request to look up and update the positions of any + // partitions which are awaiting reset. + eventHandler.add(new ResetPositionsApplicationEvent()); Review Comment: Should we wait for the time here like what we did in `refreshCommittedOffsetsIfNeeded`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, @Override public ConsumerRecords<K, V> poll(final Duration timeout) { Timer timer = time.timer(timeout); + try { - do { - if (!eventHandler.isEmpty()) { - final Optional<BackgroundEvent> backgroundEvent = eventHandler.poll(); - // processEvent() may process 3 types of event: - // 1. Errors - // 2. Callback Invocation - // 3. Fetch responses - // Errors will be handled or rethrown. - // Callback invocation will trigger callback function execution, which is blocking until completion. - // Successful fetch responses will be added to the completedFetches in the fetcher, which will then - // be processed in the collectFetches(). - backgroundEvent.ifPresent(event -> processEvent(event, timeout)); - } + backgroundEventProcessor.process(); - updateFetchPositionsIfNeeded(timer); + this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); - // The idea here is to have the background thread sending fetches autonomously, and the fetcher - // uses the poll loop to retrieve successful fetchResponse and process them on the polling thread. - final Fetch<K, V> fetch = collectFetches(); - if (!fetch.isEmpty()) { - return processFetchResults(fetch); - } - // We will wait for retryBackoffMs - } while (time.timer(timeout).notExpired()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask(); + if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { + throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); + } - return ConsumerRecords.empty(); - } + do { + updateAssignmentMetadataIfNeeded(timer); + final Fetch<K, V> fetch = pollForFetches(timer); - /** - * Set the fetch position to the committed position (if there is one) or reset it using the - * offset reset policy the user has configured (if partitions require reset) - * - * @return true if the operation completed without timing out - * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined - */ - private boolean updateFetchPositionsIfNeeded(final Timer timer) { - // If any partitions have been truncated due to a leader change, we need to validate the offsets - ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); - eventHandler.add(validatePositionsEvent); + if (!fetch.isEmpty()) { + sendFetches(); - // If there are any partitions which do not have a valid position and are not - // awaiting reset, then we need to fetch committed offsets. We will only do a - // coordinator lookup if there are partitions which have missing positions, so - // a consumer with manually assigned partitions can avoid a coordinator dependence - // by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) - return false; + if (fetch.records().isEmpty()) { + log.trace("Returning empty records from `poll()` " + + "since the consumer's position has advanced for at least one topic partition"); + } - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); + return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records())); + } + // We will wait for retryBackoffMs + } while (timer.notExpired()); - // Finally send an asynchronous request to look up and update the positions of any - // partitions which are awaiting reset. - ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent(); - eventHandler.add(resetPositionsEvent); - return true; + return ConsumerRecords.empty(); + } finally { + this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + } + // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask(); Review Comment: Is that TODO still needed since we implemented `poll` now? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); Review Comment: Do we need this since closeTimer is already bounded by requestTimeoutMs? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, @Override public ConsumerRecords<K, V> poll(final Duration timeout) { Timer timer = time.timer(timeout); + try { - do { - if (!eventHandler.isEmpty()) { - final Optional<BackgroundEvent> backgroundEvent = eventHandler.poll(); - // processEvent() may process 3 types of event: - // 1. Errors - // 2. Callback Invocation - // 3. Fetch responses - // Errors will be handled or rethrown. - // Callback invocation will trigger callback function execution, which is blocking until completion. - // Successful fetch responses will be added to the completedFetches in the fetcher, which will then - // be processed in the collectFetches(). - backgroundEvent.ifPresent(event -> processEvent(event, timeout)); - } + backgroundEventProcessor.process(); - updateFetchPositionsIfNeeded(timer); + this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); - // The idea here is to have the background thread sending fetches autonomously, and the fetcher - // uses the poll loop to retrieve successful fetchResponse and process them on the polling thread. - final Fetch<K, V> fetch = collectFetches(); - if (!fetch.isEmpty()) { - return processFetchResults(fetch); - } - // We will wait for retryBackoffMs - } while (time.timer(timeout).notExpired()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask(); + if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { + throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); + } - return ConsumerRecords.empty(); - } + do { + updateAssignmentMetadataIfNeeded(timer); + final Fetch<K, V> fetch = pollForFetches(timer); - /** - * Set the fetch position to the committed position (if there is one) or reset it using the - * offset reset policy the user has configured (if partitions require reset) - * - * @return true if the operation completed without timing out - * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined - */ - private boolean updateFetchPositionsIfNeeded(final Timer timer) { - // If any partitions have been truncated due to a leader change, we need to validate the offsets - ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); - eventHandler.add(validatePositionsEvent); + if (!fetch.isEmpty()) { + sendFetches(); - // If there are any partitions which do not have a valid position and are not - // awaiting reset, then we need to fetch committed offsets. We will only do a - // coordinator lookup if there are partitions which have missing positions, so - // a consumer with manually assigned partitions can avoid a coordinator dependence - // by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) - return false; + if (fetch.records().isEmpty()) { + log.trace("Returning empty records from `poll()` " + + "since the consumer's position has advanced for at least one topic partition"); + } - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); + return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records())); + } + // We will wait for retryBackoffMs + } while (timer.notExpired()); - // Finally send an asynchronous request to look up and update the positions of any - // partitions which are awaiting reset. - ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent(); - eventHandler.add(resetPositionsEvent); - return true; + return ConsumerRecords.empty(); + } finally { + this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); Review Comment: It seems that timer.currentTimeMs() doesn't change btw poll start and poll end because of the way that Timer is constructed? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field Review Comment: This is an existing issue, but I don't quite understand this comment. In other places, we just use `time` directly assuming it's never null. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -596,12 +786,38 @@ public Set<String> subscription() { @Override public void subscribe(Collection<String> topics) { - throw new KafkaException("method not implemented"); + subscribe(topics, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + maybeThrowInvalidGroupIdException(); + if (topics == null) + throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); + if (topics.isEmpty()) { + // treat subscribing to empty topic list as the same as unsubscribing + this.unsubscribe(); Review Comment: Could we remove `this` for better consistency? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -622,12 +837,18 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); - // assignment change event will trigger autocommit if it is configured and the group id is specified. This is - // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance Review Comment: The comment is a bit confusing. The code doesn't seem to do anything related to offsets and rebalance. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); + } + + closeTimer.reset(remainingDurationInTimeout); + + // This is a blocking call bound by the time remaining in closeTimer Review Comment: We are not really passing in closeTimer below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ########## @@ -135,7 +135,8 @@ void recordAggregatedMetrics(int bytes, int records) { /** * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources * are closed. This is somewhat analogous to {@link Closeable#close() closing}, though no error will result if a - * caller invokes {@link #fetchRecords(FetchConfig, int)}; an empty {@link List list} will be returned instead. + * caller invokes {@link #fetchRecords(FetchConfig, Deserializers, int)}; an empty {@link List list} will be Review Comment: This is an existing issue, but I am not sure why the comment mentions `fetchRecords`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { Review Comment: `fetchPosition` is updated in the background thread, right? So, it could change anytime during the `poll` call in the consumer. Do we need the info on `fetchPosition` to be accurately reflected here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() { } /** - * Refresh the committed offsets for partitions that require initialization. + * Refresh the committed offsets for provided partitions. Review Comment: No partitions are provided to this method. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java: ########## @@ -18,11 +18,18 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import java.io.Closeable; + /** * {@code PollResult} consist of {@code UnsentRequest} if there are requests to send; otherwise, return the time till * the next poll event. */ -public interface RequestManager { +public interface RequestManager extends Closeable { Review Comment: It seems that none of the request managers implements `close`. Does this need to be `Closeable`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java: ########## @@ -54,11 +49,8 @@ * * Note: the {@link Deserializer deserializers} used for the key and value are not closed by this class. They should be * closed by the creator of the {@link FetchConfig}. - * Review Comment: There is still a mention of `deserializers` above. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java: ########## @@ -16,41 +16,149 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel; /** * {@code RequestManagers} provides a means to pass around the set of {@link RequestManager} instances in the system. * This allows callers to both use the specific {@link RequestManager} instance, or to iterate over the list via * the {@link #entries()} method. */ -public class RequestManagers { +public class RequestManagers implements Closeable { + private final Logger log; public final Optional<CoordinatorRequestManager> coordinatorRequestManager; public final Optional<CommitRequestManager> commitRequestManager; public final OffsetsRequestManager offsetsRequestManager; + public final FetchRequestManager fetchRequestManager; + private final List<Optional<? extends RequestManager>> entries; + private final IdempotentCloser closer = new IdempotentCloser(); - public RequestManagers(OffsetsRequestManager offsetsRequestManager, + public RequestManagers(LogContext logContext, + OffsetsRequestManager offsetsRequestManager, + FetchRequestManager fetchRequestManager, Optional<CoordinatorRequestManager> coordinatorRequestManager, Optional<CommitRequestManager> commitRequestManager) { - this.offsetsRequestManager = requireNonNull(offsetsRequestManager, - "OffsetsRequestManager cannot be null"); + this.log = logContext.logger(RequestManagers.class); + this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; this.commitRequestManager = commitRequestManager; + this.fetchRequestManager = fetchRequestManager; List<Optional<? extends RequestManager>> list = new ArrayList<>(); list.add(coordinatorRequestManager); list.add(commitRequestManager); list.add(Optional.of(offsetsRequestManager)); + list.add(Optional.of(fetchRequestManager)); entries = Collections.unmodifiableList(list); } public List<Optional<? extends RequestManager>> entries() { return entries; } + + @Override + public void close() { + closer.close( + () -> { + log.debug("Closing RequestManagers"); + + entries.forEach(rm -> { + rm.ifPresent(requestManager -> { + try { + requestManager.close(); + } catch (Throwable t) { + log.debug("Error closing request manager {}", requestManager.getClass().getSimpleName(), t); + } + }); + }); + log.debug("RequestManagers has been closed"); + }, + () -> log.debug("RequestManagers was already closed")); + + } + + /** + * Creates a {@link Supplier} for deferred creation during invocation by + * {@link org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread}. + */ + public static <K, V> Supplier<RequestManagers> supplier(final Time time, Review Comment: `RequestManagers` doesn't take <K, V>. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -79,6 +100,7 @@ public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> public synchronized int sendFetches() { Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); + Review Comment: extra new line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org