junrao commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1334821243


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private ConsumerNetworkClient oldConsumerClient;
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate consumerClient;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+                Collections.emptyMap(), singletonMap(topicName, 4),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    private void assignFromUserNoId(Set<TopicPartition> partitions) {

Review Comment:
   The NoId part is a bit confusing. assignFromUserSingleTopic?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.NetworkClient;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+    private final Logger log;
+    private final BackgroundEventHandler backgroundEventHandler;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final List<CompletableFuture<Queue<CompletedFetch>>> futures;
+
+    FetchRequestManager(final LogContext logContext,
+                        final Time time,
+                        final BackgroundEventHandler backgroundEventHandler,
+                        final ConsumerMetadata metadata,
+                        final SubscriptionState subscriptions,
+                        final FetchConfig fetchConfig,
+                        final FetchMetricsManager metricsManager,
+                        final NetworkClientDelegate networkClientDelegate) {
+        super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+        this.log = logContext.logger(FetchRequestManager.class);
+        this.backgroundEventHandler = backgroundEventHandler;
+        this.networkClientDelegate = networkClientDelegate;
+        this.futures = new ArrayList<>();
+    }
+
+    @Override
+    protected boolean isUnavailable(Node node) {
+        return networkClientDelegate.isUnavailable(node);
+    }
+
+    @Override
+    protected void maybeThrowAuthFailure(Node node) {
+        networkClientDelegate.maybeThrowAuthFailure(node);
+    }
+
+    /**
+     * Adds a new {@link Future future} to the list of futures awaiting 
results. Per the comments on
+     * {@link #forwardResults()}, there is no guarantee that this particular 
future will be provided with
+     * a non-empty result, but it is guaranteed to be completed with a result, 
assuming that it does not time out.
+     *
+     * @param future Future that will be {@link 
CompletableFuture#complete(Object) completed} if not timed out
+     */
+    public void requestFetch(CompletableFuture<Queue<CompletedFetch>> future) {
+        futures.add(future);
+    }
+
+    @Override
+    public PollResult poll(long currentTimeMs) {

Review Comment:
   Could we describe what `poll` does?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private ConsumerNetworkClient oldConsumerClient;
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate consumerClient;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+                Collections.emptyMap(), singletonMap(topicName, 4),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    private void assignFromUserNoId(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap("noId", 1), Collections.emptyMap()));
+
+        // A dummy metadata update to ensure valid leader epoch.
+        metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1,
+                Collections.emptyMap(), singletonMap("noId", 1),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        if (metrics != null)
+            this.metrics.close();
+        if (fetcher != null)
+            this.fetcher.close();
+    }
+
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();
+        return fetcher.sendFetches();
+    }
+
+    @Test
+    public void testFetchNormal() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));

Review Comment:
   Could we be consistent with the use of `this`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private ConsumerNetworkClient oldConsumerClient;
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate consumerClient;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,

Review Comment:
   `client.updateMetadata `eventually calls 
`metadata.updateWithCurrentRequestVersion`. So, not sure why we are updating 
cluster metadata twice with different values.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -693,15 +1015,14 @@ private boolean refreshCommittedOffsetsIfNeeded(Timer 
timer) {
 
         log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
         try {
-            final Map<TopicPartition, OffsetAndMetadata> offsets = 
eventHandler.addAndGet(new OffsetFetchApplicationEvent(initializingPartitions), 
timer);
+            final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(new 
OffsetFetchApplicationEvent(initializingPartitions), timer);

Review Comment:
   If `initializingPartitions` is empty, do we still send the OffsetFetch 
request? I didn't see the logic for short-circuiting.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -37,7 +36,7 @@
  *
  * <em>Note</em>: this class is not thread-safe and is intended to only be 
used from a single thread.
  */
-public class FetchBuffer implements Closeable {
+public class FetchBuffer implements AutoCloseable {

Review Comment:
   The above comment that this class is only used from a single thread seems 
incorrect. The background thread adds fetched data into `completedFetches` and 
`PrototypeAsyncConsumer` drains `completedFetches` through 
`fetchCollector.collectFetch(fetchBuffer)`. There is actually very subtle 
synchronization between the two threads. Could we document the correct way of 
using this class and how we achieve synchronization so that future developers 
don't break it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -395,12 +523,12 @@ private void maybeThrowInvalidGroupIdException() {
 
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
-        throw new KafkaException("method not implemented");
+        return Collections.unmodifiableMap(this.metrics.metrics());

Review Comment:
   Is `this` needed?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private ConsumerNetworkClient oldConsumerClient;
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate consumerClient;

Review Comment:
   consumerClient => networkClientDelegate?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> 
partitions) {
                 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
         }
 
-        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+        // Clear the buffered data which are not a part of newly assigned 
topics
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (partitions.contains(tp))
+                currentTopicPartitions.add(tp);
+        }
+
+        fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance
-        eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+        // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+        // are committed since there will be no following rebalance
+        applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
 
         log.info("Assigned to partition(s): {}", join(partitions, ", "));
         if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-            eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+            applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().isEmpty())
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    /**
+     * Send the requests for fetch data to the background thread and set up to 
collect the results in
+     * {@link #fetchResults}.
+     */
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        applicationEventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (error != null)
+                log.warn("An error occurred during poll: {}", 
error.getMessage(), error);
+            else
+                fetchResults.addAll(completedFetches);
+        });
+    }
+
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we don't have any waiting 
data here; it's a 'best case' effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            while (pollTimer.notExpired()) {
+                CompletedFetch completedFetch = 
fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS);
+
+                if (completedFetch != null)
+                    fetchBuffer.add(completedFetch);
+
+                pollTimer.update();
+            }
+        } catch (InterruptedException e) {
+            log.trace("Timeout during fetch", e);
+        } finally {
+            timer.update(pollTimer.currentTimeMs());
+        }
+
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        applicationEventHandler.add(new ValidatePositionsApplicationEvent());
+
+        cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+        if (cachedSubscriptionHasAllFetchPositions) return true;
 
-        clusterResourceListeners.maybeAdd(keyDeserializer);
-        clusterResourceListeners.maybeAdd(valueDeserializer);
-        return clusterResourceListeners;
+        // If there are any partitions which do not have a valid position and 
are not
+        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
+        // coordinator lookup if there are partitions which have missing 
positions, so
+        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
+        // by always ensuring that assigned partitions have an initial 
position.
+        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
+            return false;
+
+        // If there are partitions still needing a position and a reset policy 
is defined,
+        // request reset using the default policy. If no reset strategy is 
defined and there

Review Comment:
   Earlier, we have "do not have a valid position and are not awaiting reset". 
The reset there and the reset here mean different things. The former refers to 
resetting the offset based on OffsetForLeaderEpoch and the latter refers to 
resetting the offset to either the earliest or latest. It would be useful to 
make this clear.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> 
partitions) {
                 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
         }
 
-        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+        // Clear the buffered data which are not a part of newly assigned 
topics
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (partitions.contains(tp))
+                currentTopicPartitions.add(tp);
+        }
+
+        fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance
-        eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+        // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+        // are committed since there will be no following rebalance
+        applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
 
         log.info("Assigned to partition(s): {}", join(partitions, ", "));
         if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-            eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+            applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().isEmpty())
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    /**
+     * Send the requests for fetch data to the background thread and set up to 
collect the results in
+     * {@link #fetchResults}.
+     */
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        applicationEventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (error != null)
+                log.warn("An error occurred during poll: {}", 
error.getMessage(), error);
+            else
+                fetchResults.addAll(completedFetches);
+        });
+    }
+
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we don't have any waiting 
data here; it's a 'best case' effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            while (pollTimer.notExpired()) {
+                CompletedFetch completedFetch = 
fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS);
+
+                if (completedFetch != null)
+                    fetchBuffer.add(completedFetch);

Review Comment:
   Hmm, with this logic, there is a short window between `completedFetch` in 
`fetchResults` and `completedFetch` be added to back to `fetchBuffer`. During 
this window, `BackgroundThread` could make a `poll` call, see no buffered data 
in `fetchBuffer` and fetch the same offset again? Then the consumer could see 
duplicated data because of this.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -489,20 +623,44 @@ private Map<TopicPartition, Long> 
beginningOrEndOffset(Collection<TopicPartition
         if (partitions.isEmpty()) {
             return Collections.emptyMap();
         }
-        Map<TopicPartition, Long> timestampToSearch =
-                
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+        Map<TopicPartition, Long> timestampToSearch = partitions
+                .stream()
+                .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
         final ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
                 timestampToSearch,
-                false);
-        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
-                eventHandler.addAndGet(listOffsetsEvent, time.timer(timeout));
-        return 
offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(e -> 
e.getKey(),
-                e -> e.getValue().offset()));
+                false
+        );

Review Comment:
   To be consistent with other places, it seems that we want to combine this 
with the previous line. Ditto below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> 
partitions) {
                 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
         }
 
-        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+        // Clear the buffered data which are not a part of newly assigned 
topics
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (partitions.contains(tp))
+                currentTopicPartitions.add(tp);
+        }
+
+        fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance
-        eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+        // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+        // are committed since there will be no following rebalance
+        applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
 
         log.info("Assigned to partition(s): {}", join(partitions, ", "));
         if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-            eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+            applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().isEmpty())
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    /**
+     * Send the requests for fetch data to the background thread and set up to 
collect the results in
+     * {@link #fetchResults}.
+     */
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        applicationEventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (error != null)
+                log.warn("An error occurred during poll: {}", 
error.getMessage(), error);
+            else
+                fetchResults.addAll(completedFetches);
+        });
+    }
+
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we don't have any waiting 
data here; it's a 'best case' effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            while (pollTimer.notExpired()) {
+                CompletedFetch completedFetch = 
fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS);
+
+                if (completedFetch != null)
+                    fetchBuffer.add(completedFetch);
+
+                pollTimer.update();
+            }
+        } catch (InterruptedException e) {
+            log.trace("Timeout during fetch", e);
+        } finally {
+            timer.update(pollTimer.currentTimeMs());
+        }
+
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        applicationEventHandler.add(new ValidatePositionsApplicationEvent());
+
+        cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+        if (cachedSubscriptionHasAllFetchPositions) return true;
 
-        clusterResourceListeners.maybeAdd(keyDeserializer);
-        clusterResourceListeners.maybeAdd(valueDeserializer);
-        return clusterResourceListeners;
+        // If there are any partitions which do not have a valid position and 
are not
+        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
+        // coordinator lookup if there are partitions which have missing 
positions, so
+        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
+        // by always ensuring that assigned partitions have an initial 
position.
+        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))

Review Comment:
   It still seems weird that we only use the timer for 
`refreshCommittedOffsetsIfNeeded`, but not for other cases where we don't have 
valid fetch positions. For example, if all partitions are in AWAIT_VALIDATION 
state, it seems that PrototypeAsyncConsumer.poll() will just go in a busy loop, 
which is not efficient.
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -222,58 +99,158 @@ public void run() {
         } catch (final Throwable t) {
             log.error("The background thread failed due to unexpected error", 
t);
             throw new KafkaException(t);
-        } finally {
-            close();
-            log.debug("{} closed", getClass());
         }
     }
 
+    void initializeResources() {
+        applicationEventProcessor = applicationEventProcessorSupplier.get();
+        networkClientDelegate = networkClientDelegateSupplier.get();
+        requestManagers = requestManagersSupplier.get();
+    }
+
     /**
-     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
-     * 1. Drains and try to process all the requests in the queue.
-     * 2. Iterate through the registry, poll, and get the next poll time for 
the network poll
-     * 3. Poll the networkClient to send and retrieve the response.
+     * Poll and process the {@link ApplicationEvent application events}. It 
performs the following tasks:
+     *
+     * <ol>
+     *     <li>
+     *         Drains and processes all the events from the application 
thread's application event queue via
+     *         {@link ApplicationEventProcessor}
+     *     </li>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#poll(long)} to get
+     *         the {@link NetworkClientDelegate.UnsentRequest} list and the 
poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         Poll the client via {@link KafkaClient#poll(long, long)} to 
send the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     * </ol>
      */
     void runOnce() {
-        if (!applicationEventQueue.isEmpty()) {
-            LinkedList<ApplicationEvent> res = new LinkedList<>();
-            this.applicationEventQueue.drainTo(res);
-
-            for (ApplicationEvent event : res) {
-                log.debug("Consuming application event: {}", event);
-                Objects.requireNonNull(event);
-                applicationEventProcessor.process(event);
-            }
-        }
+        // If there are errors processing any events, the error will be thrown 
immediately. This will have
+        // the effect of closing the background thread.
+        applicationEventProcessor.process();
 
         final long currentTimeMs = time.milliseconds();
         final long pollWaitTimeMs = requestManagers.entries().stream()
                 .filter(Optional::isPresent)
-                .map(m -> m.get().poll(currentTimeMs))
-                .map(this::handlePollResult)
+                .map(Optional::get)
+                .map(rm -> rm.poll(currentTimeMs))
+                .map(networkClientDelegate::addAll)
                 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
         networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
     }
 
-    long handlePollResult(NetworkClientDelegate.PollResult res) {
-        if (!res.unsentRequests.isEmpty()) {
-            networkClientDelegate.addAll(res.unsentRequests);
+    /**
+     * Performs any network I/O that is needed at the time of close for the 
consumer:
+     *
+     * <ol>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#pollOnClose()}
+     *         to get the {@link NetworkClientDelegate.UnsentRequest} list and 
the poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         {@link KafkaClient#poll(long, long) Poll the client} to send 
the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     *     <li>
+     *         Continuously {@link KafkaClient#poll(long, long) poll the 
client} as long as the
+     *         {@link Timer#notExpired() timer hasn't expired} to retrieve the 
responses
+     *     </li>
+     * </ol>
+     */
+    // Visible for testing
+    static void runAtClose(final Time time,
+                           final Collection<Optional<? extends 
RequestManager>> requestManagers,
+                           final NetworkClientDelegate networkClientDelegate,
+                           final Timer timer) {
+        long currentTimeMs = time.milliseconds();
+
+        // These are the optional outgoing requests at the
+        List<NetworkClientDelegate.PollResult> pollResults = 
requestManagers.stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(RequestManager::pollOnClose)
+                .collect(Collectors.toList());
+        long pollWaitTimeMs = pollResults.stream()
+                .map(networkClientDelegate::addAll)
+                .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
+        pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs());
+        networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+        timer.update();
+
+        List<Future<?>> requestFutures = pollResults.stream()
+                .flatMap(fads -> fads.unsentRequests.stream())
+                .map(NetworkClientDelegate.UnsentRequest::future)
+                .collect(Collectors.toList());
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        while (timer.notExpired() && 
!requestFutures.stream().allMatch(Future::isDone)) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);

Review Comment:
   Instead of using `currentTimeMs`, we need to use `timer.currentTimeMs` to 
pick up the latest time, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.LogContext;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An {@link EventProcessor} that is created and executes in the application 
thread for the purpose of processing
+ * {@link BackgroundEvent background events} generated by  {@link 
DefaultBackgroundThread the background thread}.
+ * Those events are generally of two types:
+ *
+ * <ul>
+ *     <li>Errors that occur in the background thread that need to be 
propagated to the application thread</li>
+ *     <li>{@link ConsumerRebalanceListener} callbacks that are to be executed 
on the application thread</li>
+ * </ul>
+ */
+public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
+
+    public BackgroundEventProcessor(final LogContext logContext,
+                                    final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+        super(logContext, backgroundEventQueue);
+    }
+
+    /**
+     * Process the events—if any—that were produced by the {@link 
DefaultBackgroundThread background thread}.
+     * It is possible that when processing the events that a given event will
+     * {@link ErrorBackgroundEvent represent an error directly}, or it could 
be that processing an event generates
+     * an error. In such cases, the processor will continue to process the 
remaining events. In this case, we
+     * provide the caller to provide a callback handler that "collects" the 
errors. We grab the first error that

Review Comment:
   Where is the callback handler?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -222,58 +99,158 @@ public void run() {
         } catch (final Throwable t) {
             log.error("The background thread failed due to unexpected error", 
t);
             throw new KafkaException(t);
-        } finally {
-            close();
-            log.debug("{} closed", getClass());
         }
     }
 
+    void initializeResources() {
+        applicationEventProcessor = applicationEventProcessorSupplier.get();
+        networkClientDelegate = networkClientDelegateSupplier.get();
+        requestManagers = requestManagersSupplier.get();
+    }
+
     /**
-     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
-     * 1. Drains and try to process all the requests in the queue.
-     * 2. Iterate through the registry, poll, and get the next poll time for 
the network poll
-     * 3. Poll the networkClient to send and retrieve the response.
+     * Poll and process the {@link ApplicationEvent application events}. It 
performs the following tasks:
+     *
+     * <ol>
+     *     <li>
+     *         Drains and processes all the events from the application 
thread's application event queue via
+     *         {@link ApplicationEventProcessor}
+     *     </li>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#poll(long)} to get
+     *         the {@link NetworkClientDelegate.UnsentRequest} list and the 
poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         Poll the client via {@link KafkaClient#poll(long, long)} to 
send the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     * </ol>
      */
     void runOnce() {
-        if (!applicationEventQueue.isEmpty()) {
-            LinkedList<ApplicationEvent> res = new LinkedList<>();
-            this.applicationEventQueue.drainTo(res);
-
-            for (ApplicationEvent event : res) {
-                log.debug("Consuming application event: {}", event);
-                Objects.requireNonNull(event);
-                applicationEventProcessor.process(event);
-            }
-        }
+        // If there are errors processing any events, the error will be thrown 
immediately. This will have
+        // the effect of closing the background thread.

Review Comment:
   If the background thread dies, should we throw an exception to the user 
thread on the next `poll` call?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.NetworkClient;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+    private final Logger log;
+    private final BackgroundEventHandler backgroundEventHandler;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final List<CompletableFuture<Queue<CompletedFetch>>> futures;
+
+    FetchRequestManager(final LogContext logContext,
+                        final Time time,
+                        final BackgroundEventHandler backgroundEventHandler,
+                        final ConsumerMetadata metadata,
+                        final SubscriptionState subscriptions,
+                        final FetchConfig fetchConfig,
+                        final FetchMetricsManager metricsManager,
+                        final NetworkClientDelegate networkClientDelegate) {
+        super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+        this.log = logContext.logger(FetchRequestManager.class);
+        this.backgroundEventHandler = backgroundEventHandler;
+        this.networkClientDelegate = networkClientDelegate;
+        this.futures = new ArrayList<>();
+    }
+
+    @Override
+    protected boolean isUnavailable(Node node) {
+        return networkClientDelegate.isUnavailable(node);
+    }
+
+    @Override
+    protected void maybeThrowAuthFailure(Node node) {
+        networkClientDelegate.maybeThrowAuthFailure(node);
+    }
+
+    /**
+     * Adds a new {@link Future future} to the list of futures awaiting 
results. Per the comments on
+     * {@link #forwardResults()}, there is no guarantee that this particular 
future will be provided with
+     * a non-empty result, but it is guaranteed to be completed with a result, 
assuming that it does not time out.
+     *
+     * @param future Future that will be {@link 
CompletableFuture#complete(Object) completed} if not timed out
+     */
+    public void requestFetch(CompletableFuture<Queue<CompletedFetch>> future) {
+        futures.add(future);
+    }
+
+    @Override
+    public PollResult poll(long currentTimeMs) {
+        List<UnsentRequest> requests = 
prepareFetchRequests().entrySet().stream().map(entry -> {
+            final Node fetchTarget = entry.getKey();
+            final FetchSessionHandler.FetchRequestData data = entry.getValue();
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+            final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, t) -> {
+                if (t != null) {
+                    handleFetchResponse(fetchTarget, t);
+                    log.debug("Attempt to fetch data from node {} failed due 
to fatal exception", fetchTarget, t);
+                    backgroundEventHandler.add(new ErrorBackgroundEvent(t));

Review Comment:
   At this stage, we are just propagating connection level errors like 
DisconnectException, which is retriable. So, it seems that we shouldn't throw 
this error back to the application. Ditto in `pollOnClose`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+
+        // Attempt to fetch any data. It's OK if we time out here; it's a best 
case effort. The
+        // data may not be immediately available, but the calling method 
(poll) will correctly
+        // handle the overall timeout.
+        try {
+            Queue<CompletedFetch> completedFetches = 
eventHandler.addAndGet(new FetchEvent(), pollTimer);
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        } catch (TimeoutException e) {
+            log.trace("Timeout during fetch", e);
+        } finally {
+            timer.update(pollTimer.currentTimeMs());
+        }
 
-        clusterResourceListeners.maybeAdd(keyDeserializer);
-        clusterResourceListeners.maybeAdd(valueDeserializer);
-        return clusterResourceListeners;
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+        eventHandler.add(new ValidatePositionsApplicationEvent());

Review Comment:
   @lianetm : Thanks for the reply. That's my understanding too. My question 
was what happens before the new metadata is received. Will the consumer 
continuously fetch from the older leader?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -622,12 +837,18 @@ public void assign(Collection<TopicPartition> partitions) 
{
                 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
         }
 
-        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+        // Clear the buffered data which are not a part of newly assigned 
topics
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (partitions.contains(tp))
+                currentTopicPartitions.add(tp);
+        }
+
+        fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance
+        // make sure the offsets of topic partitions the consumer is 
unsubscribing from
+        // are committed since there will be no following rebalance

Review Comment:
   Was this comment addressed?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-        throw new KafkaException("method not implemented");
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        log.info("Subscribed to pattern: '{}'", pattern);
+        this.subscriptions.subscribe(pattern, listener);
+        this.updatePatternSubscription(metadata.fetch());
+        this.metadata.requestUpdateForNewTopics();
+    }
+
+    /**
+     * TODO: remove this when we implement the KIP-848 protocol.
+     *
+     * <p>
+     * The contents of this method are shamelessly stolen from
+     * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+     * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+                .filter(subscriptions::matchesSubscribedPattern)
+                .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+            metadata.requestUpdateForNewTopics();
     }
 
     @Override
     public void subscribe(Pattern pattern) {
-        throw new KafkaException("method not implemented");
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
     }
 
     @Override
     public void unsubscribe() {
-        throw new KafkaException("method not implemented");
+        fetchBuffer.retainAll(Collections.emptySet());
+        this.subscriptions.unsubscribe();
     }
 
     @Override
     @Deprecated
-    public ConsumerRecords<K, V> poll(long timeout) {
-        throw new KafkaException("method not implemented");
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(Duration.ofMillis(timeoutMs));
     }
 
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
     }
 
-    private static <K, V> ClusterResourceListeners 
configureClusterResourceListeners(
-            final Deserializer<K> keyDeserializer,
-            final Deserializer<V> valueDeserializer,
-            final List<?>... candidateLists) {
-        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-        for (List<?> candidateList: candidateLists)
-            clusterResourceListeners.maybeAddAll(candidateList);
+    private void sendFetches() {
+        FetchEvent event = new FetchEvent();
+        eventHandler.add(event);
+
+        event.future().whenComplete((completedFetches, error) -> {
+            if (completedFetches != null && !completedFetches.isEmpty()) {
+                fetchBuffer.addAll(completedFetches);
+            }
+        });
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = timer.remainingMs();
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some 
positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {

Review Comment:
   fetchPosition is updated in the user thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
         Timer timer = time.timer(timeout);
+
         try {
-            do {
-                if (!eventHandler.isEmpty()) {
-                    final Optional<BackgroundEvent> backgroundEvent = 
eventHandler.poll();
-                    // processEvent() may process 3 types of event:
-                    // 1. Errors
-                    // 2. Callback Invocation
-                    // 3. Fetch responses
-                    // Errors will be handled or rethrown.
-                    // Callback invocation will trigger callback function 
execution, which is blocking until completion.
-                    // Successful fetch responses will be added to the 
completedFetches in the fetcher, which will then
-                    // be processed in the collectFetches().
-                    backgroundEvent.ifPresent(event -> processEvent(event, 
timeout));
-                }
+            backgroundEventProcessor.process();
 
-                updateFetchPositionsIfNeeded(timer);
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
-                // The idea here is to have the background thread sending 
fetches autonomously, and the fetcher
-                // uses the poll loop to retrieve successful fetchResponse and 
process them on the polling thread.
-                final Fetch<K, V> fetch = collectFetches();
-                if (!fetch.isEmpty()) {
-                    return processFetchResults(fetch);
-                }
-                // We will wait for retryBackoffMs
-            } while (time.timer(timeout).notExpired());
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+                throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
+            }
 
-        return ConsumerRecords.empty();
-    }
+            do {
+                updateAssignmentMetadataIfNeeded(timer);
+                final Fetch<K, V> fetch = pollForFetches(timer);
 
-    /**
-     * Set the fetch position to the committed position (if there is one) or 
reset it using the
-     * offset reset policy the user has configured (if partitions require 
reset)
-     *
-     * @return true if the operation completed without timing out
-     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
-     * @throws NoOffsetForPartitionException                          If no 
offset is stored for a given partition and no offset reset policy is
-     *                                                                defined
-     */
-    private boolean updateFetchPositionsIfNeeded(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we 
need to validate the offsets
-        ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
-        eventHandler.add(validatePositionsEvent);
+                if (!fetch.isEmpty()) {
+                    sendFetches();
 
-        // If there are any partitions which do not have a valid position and 
are not
-        // awaiting reset, then we need to fetch committed offsets. We will 
only do a
-        // coordinator lookup if there are partitions which have missing 
positions, so
-        // a consumer with manually assigned partitions can avoid a 
coordinator dependence
-        // by always ensuring that assigned partitions have an initial 
position.
-        if (isCommittedOffsetsManagementEnabled() && 
!refreshCommittedOffsetsIfNeeded(timer))
-            return false;
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                                + "since the consumer's position has advanced 
for at least one topic partition");
+                    }
 
-        // If there are partitions still needing a position and a reset policy 
is defined,
-        // request reset using the default policy. If no reset strategy is 
defined and there
-        // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-        subscriptions.resetInitializingPositions();
+                    return this.interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                }
+                // We will wait for retryBackoffMs
+            } while (timer.notExpired());
 
-        // Finally send an asynchronous request to look up and update the 
positions of any
-        // partitions which are awaiting reset.
-        ResetPositionsApplicationEvent resetPositionsEvent = new 
ResetPositionsApplicationEvent();
-        eventHandler.add(resetPositionsEvent);
-        return true;
+            return ConsumerRecords.empty();
+        } finally {
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+        }
+        // TODO: Once we implement poll(), clear wakeupTrigger in a finally 
block: wakeupTrigger.clearActiveTask();

Review Comment:
   Is this comment addressed?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private ConsumerNetworkClient oldConsumerClient;
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate consumerClient;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+                Collections.emptyMap(), singletonMap(topicName, 4),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    private void assignFromUserNoId(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap("noId", 1), Collections.emptyMap()));
+
+        // A dummy metadata update to ensure valid leader epoch.
+        metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1,
+                Collections.emptyMap(), singletonMap("noId", 1),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        if (metrics != null)
+            this.metrics.close();
+        if (fetcher != null)
+            this.fetcher.close();
+    }
+
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();
+        return fetcher.sendFetches();
+    }
+
+    @Test
+    public void testFetchNormal() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp0);
+        assertEquals(3, records.size());
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testInflightFetchOnPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        subscriptions.markPendingRevocation(singleton(tp0));
+
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertNull(fetchedRecords().get(tp0));
+    }
+
+    @Test
+    public void testCloseShouldBeIdempotent() {
+        buildFetcher();
+
+        fetcher.close();
+        fetcher.close();
+        fetcher.close();
+
+        verify(fetcher, times(1)).closeInternal(any(Timer.class));
+    }
+
+    @Test
+    public void testFetcherCloseClosesFetchSessionsInBroker() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        final FetchResponse fetchResponse = fullFetchResponse(tidp0, 
this.records, Errors.NONE, 100L, 0);
+        client.prepareResponse(fetchResponse);
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        assertEquals(0, consumerClient.pendingRequestCount());
+
+        final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = 
ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class);
+
+        // send request to close the fetcher
+        Timer timer = time.timer(Duration.ofSeconds(10));
+        this.fetcher.close(timer);
+        NetworkClientDelegate.PollResult pollResult = 
fetcher.poll(time.milliseconds());
+        consumerClient.addAll(pollResult.unsentRequests);
+        consumerClient.poll(timer);
+
+        // validate that Fetcher.close() has sent a request with final epoch. 
2 requests are sent, one for the normal
+        // fetch earlier and another for the finish fetch here.
+        verify(consumerClient, times(2)).doSend(argument.capture(), 
any(Long.class));
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
argument.getValue();
+        FetchRequest.Builder builder = (FetchRequest.Builder) 
unsentRequest.requestBuilder();
+        // session Id is the same
+        assertEquals(fetchResponse.sessionId(), 
builder.metadata().sessionId());
+        // contains final epoch
+        assertEquals(FetchMetadata.FINAL_EPOCH, builder.metadata().epoch());  
// final epoch indicates we want to close the session
+        assertTrue(builder.fetchData().isEmpty()); // partition data should be 
empty
+    }
+
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));
+        assertEquals(0, sendFetches());
+        consumerClient.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testFetchWithNoTopicId() {
+        // Should work and default to using old request type.
+        buildFetcher();
+
+        TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new 
TopicPartition("noId", 0));
+        assignFromUserNoId(singleton(noId.topicPartition()));
+        subscriptions.seek(noId.topicPartition(), 0);
+
+        // Fetch should use request version 12
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12, noId, 0, 
Optional.of(validLeaderEpoch)),
+                fullFetchResponse(noId, this.records, Errors.NONE, 100L, 0)
+        );
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(noId.topicPartition()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(noId.topicPartition());
+        assertEquals(3, records.size());
+        assertEquals(4L, 
subscriptions.position(noId.topicPartition()).offset); // this is the next 
fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchWithTopicId() {
+        buildFetcher();
+
+        TopicIdPartition tp = new TopicIdPartition(topicId, new 
TopicPartition(topicName, 0));
+        assignFromUser(singleton(tp.topicPartition()));
+        subscriptions.seek(tp.topicPartition(), 0);
+
+        // Fetch should use latest version

Review Comment:
   Should this comment be moved to above the `client.prepareResponse` below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to