junrao commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1333393941


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we time out here; it's a best 
case effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            Queue<CompletedFetch> completedFetches = 
eventHandler.addAndGet(new FetchEvent(), pollTimer);
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        } catch (TimeoutException e) {
+            log.trace("Timeout during fetch", e);
+        } finally {
+            timer.update(pollTimer.currentTimeMs());
+        }
 
-        clusterResourceListeners.maybeAdd(keyDeserializer);
-        clusterResourceListeners.maybeAdd(valueDeserializer);
-        return clusterResourceListeners;
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        eventHandler.add(new ValidatePositionsApplicationEvent());

Review Comment:
   Just to understand this. If the consumer gets a FENCED_LEADER_EPOCH, a 
metadata update is triggered. Once the new metadata is received, the 
ValidatePositionsApplicationEvent will trigger the OffsetForLeaderEpoch request 
to validate the fetch position. Until the new metadata is received, the 
consumer will just continuously fetch from the old leader and receiving the 
same FENCED_LEADER_EPOCH error?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we time out here; it's a best 
case effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            Queue<CompletedFetch> completedFetches = 
eventHandler.addAndGet(new FetchEvent(), pollTimer);

Review Comment:
   I guess the purpose of this code is when there is no pending records, we 
block until some new records are fetched. However, I am wondering if it 
achieves the purpose. When processing a FetchEvent, ApplicationProcessor just 
calls requestManagers.fetchRequestManager.drain(). If there is nothing to 
drain, an empty Queue is returned immediately. This will unblock FetchEvent to 
return immediately without waiting for the poll time?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -596,12 +786,38 @@ public Set<String> subscription() {
 
     @Override
     public void subscribe(Collection<String> topics) {
-        throw new KafkaException("method not implemented");
+        subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener 
callback) {
-        throw new KafkaException("method not implemented");
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe 
to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as 
unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to 
subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+
+            // Clear the buffered data which are not a part of newly assigned 
topics
+            final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+            for (TopicPartition tp : subscriptions.assignedPartitions()) {

Review Comment:
   We haven't implemented the group subscription logic in 
PrototypeAsyncConsumer, right? Ditto for the pattern subscribe below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -249,7 +250,9 @@ private <K, V> Record nextFetchedRecord(FetchConfig<K, V> 
fetchConfig) {
      * @param maxRecords The number of records to return; the number returned 
may be {@code 0 <= maxRecords}
      * @return {@link ConsumerRecord Consumer records}
      */
-    <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig<K, V> 
fetchConfig, int maxRecords) {
+    <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig fetchConfig,
+                                                   Deserializers<K, V> 
deserializers,

Review Comment:
   Could we update the javadoc?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception

Review Comment:
   Is the rebalance callback called here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorEventHandler;
+    private final NetworkClientDelegate networkClientDelegate;
+
+    FetchRequestManager(final LogContext logContext,
+                        final Time time,
+                        final ErrorEventHandler errorEventHandler,
+                        final ConsumerMetadata metadata,
+                        final SubscriptionState subscriptions,
+                        final FetchConfig fetchConfig,
+                        final FetchMetricsManager metricsManager,
+                        final NetworkClientDelegate networkClientDelegate) {
+        super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+        this.log = logContext.logger(FetchRequestManager.class);
+        this.errorEventHandler = errorEventHandler;
+        this.networkClientDelegate = networkClientDelegate;
+    }
+
+    @Override
+    protected boolean isUnavailable(Node node) {
+        return networkClientDelegate.isUnavailable(node);
+    }
+
+    @Override
+    protected void maybeThrowAuthFailure(Node node) {
+        networkClientDelegate.maybeThrowAuthFailure(node);
+    }
+
+    @Override
+    public PollResult poll(long currentTimeMs) {
+        List<UnsentRequest> requests;
+
+        if (!idempotentCloser.isClosed()) {
+            // If the fetcher is open (i.e. not closed), we will issue the 
normal fetch requests
+            requests = prepareFetchRequests().entrySet().stream().map(entry -> 
{
+                final Node fetchTarget = entry.getKey();
+                final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+                final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+                final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, t) -> {
+                    if (t != null) {
+                        handleFetchResponse(fetchTarget, t);
+                        log.warn("Attempt to fetch data from node {} failed 
due to fatal exception", fetchTarget, t);
+                        errorEventHandler.handle(t);
+                    } else {
+                        handleFetchResponse(fetchTarget, data, clientResponse);
+                    }
+                };
+
+                return new UnsentRequest(request, fetchTarget, 
responseHandler);
+            }).collect(Collectors.toList());
+        } else {
+            requests = 
prepareCloseFetchSessionRequests().entrySet().stream().map(entry -> {
+                final Node fetchTarget = entry.getKey();
+                final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+                final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+                final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, t) -> {
+                    if (t != null) {
+                        handleCloseFetchSessionResponse(fetchTarget, data, t);
+                        log.warn("Attempt to close fetch session on node {} 
failed due to fatal exception", fetchTarget, t);
+                        errorEventHandler.handle(t);
+                    } else {
+                        handleCloseFetchSessionResponse(fetchTarget, data);
+                    }
+                };
+
+                return new UnsentRequest(request, fetchTarget, 
responseHandler);

Review Comment:
   Is this request guaranteed to be sent when the consumer is closed? Do we 
need this guarantee?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+public class BackgroundEventProcessor {

Review Comment:
   Could we describe what this class does? It's a bit weird the 
`BackgroundEventProcessor.process` is called from `PrototypeAsyncConsumer` in 
the foreground.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorEventHandler;
+    private final NetworkClientDelegate networkClientDelegate;
+
+    FetchRequestManager(final LogContext logContext,
+                        final Time time,
+                        final ErrorEventHandler errorEventHandler,
+                        final ConsumerMetadata metadata,
+                        final SubscriptionState subscriptions,
+                        final FetchConfig fetchConfig,
+                        final FetchMetricsManager metricsManager,
+                        final NetworkClientDelegate networkClientDelegate) {
+        super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+        this.log = logContext.logger(FetchRequestManager.class);
+        this.errorEventHandler = errorEventHandler;
+        this.networkClientDelegate = networkClientDelegate;
+    }
+
+    @Override
+    protected boolean isUnavailable(Node node) {
+        return networkClientDelegate.isUnavailable(node);
+    }
+
+    @Override
+    protected void maybeThrowAuthFailure(Node node) {
+        networkClientDelegate.maybeThrowAuthFailure(node);
+    }
+
+    @Override
+    public PollResult poll(long currentTimeMs) {
+        List<UnsentRequest> requests;
+
+        if (!idempotentCloser.isClosed()) {
+            // If the fetcher is open (i.e. not closed), we will issue the 
normal fetch requests
+            requests = prepareFetchRequests().entrySet().stream().map(entry -> 
{
+                final Node fetchTarget = entry.getKey();
+                final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+                final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+                final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, t) -> {
+                    if (t != null) {
+                        handleFetchResponse(fetchTarget, t);
+                        log.warn("Attempt to fetch data from node {} failed 
due to fatal exception", fetchTarget, t);
+                        errorEventHandler.handle(t);

Review Comment:
   This causes an exception to be thrown in the application thread. It seems 
that we should avoid doing that for at least the retriable exceptions? Ditto 
below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);

Review Comment:
   This logic is a bit weird. ApplicationEventProcessor handles FetchEvent by 
draining all completeFetches from fetchBuffer, but here we are just adding them 
back to fetchBuffer again.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we time out here; it's a best 
case effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            Queue<CompletedFetch> completedFetches = 
eventHandler.addAndGet(new FetchEvent(), pollTimer);
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        } catch (TimeoutException e) {
+            log.trace("Timeout during fetch", e);
+        } finally {
+            timer.update(pollTimer.currentTimeMs());
+        }
 
-        clusterResourceListeners.maybeAdd(keyDeserializer);
-        clusterResourceListeners.maybeAdd(valueDeserializer);
-        return clusterResourceListeners;
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        eventHandler.add(new ValidatePositionsApplicationEvent());
+
+        cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+        if (cachedSubscriptionHasAllFetchPositions) return true;
+
+        // If there are any partitions which do not have a valid position and 
are not
+        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
+        // coordinator lookup if there are partitions which have missing 
positions, so
+        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
+        // by always ensuring that assigned partitions have an initial 
position.
+        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
+            return false;
+
+        // If there are partitions still needing a position and a reset policy 
is defined,
+        // request reset using the default policy. If no reset strategy is 
defined and there
+        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+        subscriptions.resetInitializingPositions();
+
+        // Finally send an asynchronous request to look up and update the 
positions of any
+        // partitions which are awaiting reset.
+        eventHandler.add(new ResetPositionsApplicationEvent());

Review Comment:
   Should we wait for the time here like what we did in 
`refreshCommittedOffsetsIfNeeded`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
         Timer timer = time.timer(timeout);
+
         try {
-            do {
-                if (!eventHandler.isEmpty()) {
-                    final Optional<BackgroundEvent> backgroundEvent = 
eventHandler.poll();
-                    // processEvent() may process 3 types of event:
-                    // 1. Errors
-                    // 2. Callback Invocation
-                    // 3. Fetch responses
-                    // Errors will be handled or rethrown.
-                    // Callback invocation will trigger callback function 
execution, which is blocking until completion.
-                    // Successful fetch responses will be added to the 
completedFetches in the fetcher, which will then
-                    // be processed in the collectFetches().
-                    backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
-                }
+            backgroundEventProcessor.process();
 
-                updateFetchPositionsIfNeeded(timer);
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
-                // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
-                // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
-                final Fetch<K, V> fetch = collectFetches();
-                if (!fetch.isEmpty()) {
-                    return processFetchResults(fetch);
-                }
-                // We will wait for retryBackoffMs
-            } while (time.timer(timeout).notExpired());
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+                throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
+            }
 
-        return ConsumerRecords.empty();
-    }
+            do {
+                updateAssignmentMetadataIfNeeded(timer);
+                final Fetch<K, V> fetch = pollForFetches(timer);
 
-    /**
-     * Set the fetch position to the committed position (if there is one) or 
reset it using the
-     * offset reset policy the user has configured (if partitions require 
reset)
-     *
-     * @return true if the operation completed without timing out
-     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
-     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
-     *                                                                defined
-     */
-    private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
-        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
-        eventHandler.add(validatePositionsEvent);
+                if (!fetch.isEmpty()) {
+                    sendFetches();
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
-        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
-            return false;
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                                + "since the consumer's position has advanced 
for at least one topic partition");
+                    }
 
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+                    return this.interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                }
+                // We will wait for retryBackoffMs
+            } while (timer.notExpired());
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
-        ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
-        eventHandler.add(resetPositionsEvent);
-        return true;
+            return ConsumerRecords.empty();
+        } finally {
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+        }
+        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();

Review Comment:
   Is that TODO still needed since we implemented `poll` now?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);

Review Comment:
   Do we need this since closeTimer is already bounded by requestTimeoutMs?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
         Timer timer = time.timer(timeout);
+
         try {
-            do {
-                if (!eventHandler.isEmpty()) {
-                    final Optional<BackgroundEvent> backgroundEvent = 
eventHandler.poll();
-                    // processEvent() may process 3 types of event:
-                    // 1. Errors
-                    // 2. Callback Invocation
-                    // 3. Fetch responses
-                    // Errors will be handled or rethrown.
-                    // Callback invocation will trigger callback function 
execution, which is blocking until completion.
-                    // Successful fetch responses will be added to the 
completedFetches in the fetcher, which will then
-                    // be processed in the collectFetches().
-                    backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
-                }
+            backgroundEventProcessor.process();
 
-                updateFetchPositionsIfNeeded(timer);
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
-                // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
-                // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
-                final Fetch<K, V> fetch = collectFetches();
-                if (!fetch.isEmpty()) {
-                    return processFetchResults(fetch);
-                }
-                // We will wait for retryBackoffMs
-            } while (time.timer(timeout).notExpired());
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+                throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
+            }
 
-        return ConsumerRecords.empty();
-    }
+            do {
+                updateAssignmentMetadataIfNeeded(timer);
+                final Fetch<K, V> fetch = pollForFetches(timer);
 
-    /**
-     * Set the fetch position to the committed position (if there is one) or 
reset it using the
-     * offset reset policy the user has configured (if partitions require 
reset)
-     *
-     * @return true if the operation completed without timing out
-     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
-     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
-     *                                                                defined
-     */
-    private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
-        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
-        eventHandler.add(validatePositionsEvent);
+                if (!fetch.isEmpty()) {
+                    sendFetches();
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
-        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
-            return false;
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                                + "since the consumer's position has advanced 
for at least one topic partition");
+                    }
 
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+                    return this.interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                }
+                // We will wait for retryBackoffMs
+            } while (timer.notExpired());
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
-        ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
-        eventHandler.add(resetPositionsEvent);
-        return true;
+            return ConsumerRecords.empty();
+        } finally {
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());

Review Comment:
   It seems that timer.currentTimeMs() doesn't change btw poll start and poll 
end because of the way that Timer is constructed?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field

Review Comment:
   This is an existing issue, but I don't quite understand this comment. In 
other places, we just use `time` directly assuming it's never null. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -596,12 +786,38 @@ public Set<String> subscription() {
 
     @Override
     public void subscribe(Collection<String> topics) {
-        throw new KafkaException("method not implemented");
+        subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener 
callback) {
-        throw new KafkaException("method not implemented");
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe 
to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as 
unsubscribing
+            this.unsubscribe();

Review Comment:
   Could we remove `this` for better consistency?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -622,12 +837,18 @@ public void assign(Collection<TopicPartition> partitions) 
{
                 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
         }
 
-        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+        // Clear the buffered data which are not a part of newly assigned 
topics
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (partitions.contains(tp))
+                currentTopicPartitions.add(tp);
+        }
+
+        fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance
+        // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+        // are committed since there will be no following rebalance

Review Comment:
   The comment is a bit confusing. The code doesn't seem to do anything related 
to offsets and rebalance.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);
+            }
+
+            closeTimer.reset(remainingDurationInTimeout);
+
+            // This is a blocking call bound by the time remaining in 
closeTimer

Review Comment:
   We are not really passing in closeTimer below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -135,7 +135,8 @@ void recordAggregatedMetrics(int bytes, int records) {
     /**
      * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
      * are closed. This is somewhat analogous to {@link Closeable#close() 
closing}, though no error will result if a
-     * caller invokes {@link #fetchRecords(FetchConfig, int)}; an empty {@link 
List list} will be returned instead.
+     * caller invokes {@link #fetchRecords(FetchConfig, Deserializers, int)}; 
an empty {@link List list} will be

Review Comment:
   This is an existing issue, but I am not sure why the comment mentions 
`fetchRecords`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {

Review Comment:
   `fetchPosition` is updated in the background thread, right? So, it could 
change anytime during the `poll` call in the consumer. Do we need the info on 
`fetchPosition`  to be accurately reflected here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() {
     }
 
     /**
-     * Refresh the committed offsets for partitions that require 
initialization.
+     * Refresh the committed offsets for provided partitions.

Review Comment:
   No partitions are provided to this method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java:
##########
@@ -18,11 +18,18 @@
 
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 
+import java.io.Closeable;
+
 /**
  * {@code PollResult} consist of {@code UnsentRequest} if there are requests 
to send; otherwise, return the time till
  * the next poll event.
  */
-public interface RequestManager {
+public interface RequestManager extends Closeable {

Review Comment:
   It seems that none of the request managers implements `close`. Does this 
need to be `Closeable`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java:
##########
@@ -54,11 +49,8 @@
  *
  * Note: the {@link Deserializer deserializers} used for the key and value are 
not closed by this class. They should be
  * closed by the creator of the {@link FetchConfig}.
- *

Review Comment:
   There is still a mention of `deserializers` above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java:
##########
@@ -16,41 +16,149 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
 
 /**
  * {@code RequestManagers} provides a means to pass around the set of {@link 
RequestManager} instances in the system.
  * This allows callers to both use the specific {@link RequestManager} 
instance, or to iterate over the list via
  * the {@link #entries()} method.
  */
-public class RequestManagers {
+public class RequestManagers implements Closeable {
 
+    private final Logger log;
     public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
     public final Optional<CommitRequestManager> commitRequestManager;
     public final OffsetsRequestManager offsetsRequestManager;
+    public final FetchRequestManager fetchRequestManager;
+
     private final List<Optional<? extends RequestManager>> entries;
+    private final IdempotentCloser closer = new IdempotentCloser();
 
-    public RequestManagers(OffsetsRequestManager offsetsRequestManager,
+    public RequestManagers(LogContext logContext,
+                           OffsetsRequestManager offsetsRequestManager,
+                           FetchRequestManager fetchRequestManager,
                            Optional<CoordinatorRequestManager> 
coordinatorRequestManager,
                            Optional<CommitRequestManager> 
commitRequestManager) {
-        this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
-                "OffsetsRequestManager cannot be null");
+        this.log = logContext.logger(RequestManagers.class);
+        this.offsetsRequestManager = requireNonNull(offsetsRequestManager, 
"OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.commitRequestManager = commitRequestManager;
+        this.fetchRequestManager = fetchRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
         list.add(commitRequestManager);
         list.add(Optional.of(offsetsRequestManager));
+        list.add(Optional.of(fetchRequestManager));
         entries = Collections.unmodifiableList(list);
     }
 
     public List<Optional<? extends RequestManager>> entries() {
         return entries;
     }
+
+    @Override
+    public void close() {
+        closer.close(
+                () -> {
+                    log.debug("Closing RequestManagers");
+
+                    entries.forEach(rm -> {
+                        rm.ifPresent(requestManager -> {
+                            try {
+                                requestManager.close();
+                            } catch (Throwable t) {
+                                log.debug("Error closing request manager {}", 
requestManager.getClass().getSimpleName(), t);
+                            }
+                        });
+                    });
+                    log.debug("RequestManagers has been closed");
+                },
+                () -> log.debug("RequestManagers was already closed"));
+
+    }
+
+    /**
+     * Creates a {@link Supplier} for deferred creation during invocation by
+     * {@link 
org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread}.
+     */
+    public static <K, V> Supplier<RequestManagers> supplier(final Time time,

Review Comment:
   `RequestManagers` doesn't take <K, V>.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -79,6 +100,7 @@ public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition>
     public synchronized int sendFetches() {
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
 
+

Review Comment:
   extra new line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to