This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3252629a37 KAFKA-14365: Extract common logic from Fetcher (#13425)
a3252629a37 is described below

commit a3252629a37344be1f1f86a77cf685f9c147be4d
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Fri Mar 24 14:33:13 2023 -0700

    KAFKA-14365: Extract common logic from Fetcher (#13425)
    
    * KAFKA-14365: Extract common logic from Fetcher
    
    Extract logic from Fetcher into AbstractFetcher.
    
    Also introduce FetchConfig as a more concise way to delineate state from
    incoming configuration.
    
    Formalized the defaults in CommonClientConfigs and ConsumerConfig to be
    accessible elsewhere.
    
    * Removed overridden methods in favor of synchronizing where needed
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 checkstyle/suppressions.xml                        |  14 +-
 .../apache/kafka/clients/CommonClientConfigs.java  |   1 +
 .../kafka/clients/consumer/ConsumerConfig.java     |  14 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  25 +-
 .../clients/consumer/internals/AbstractFetch.java  | 791 +++++++++++++++++++++
 .../clients/consumer/internals/CompletedFetch.java |  25 +-
 .../clients/consumer/internals/FetchConfig.java    | 124 ++++
 .../consumer/internals/FetchMetricsManager.java    |  15 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 727 +------------------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  21 +-
 .../consumer/internals/CompletedFetchTest.java     |  34 +-
 .../clients/consumer/internals/FetcherTest.java    |  54 +-
 .../consumer/internals/OffsetFetcherTest.java      |  18 +-
 13 files changed, 1042 insertions(+), 821 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 93ae7c30d0f..5c95c8d4284 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -46,7 +46,7 @@
     <suppress id="dontUseSystemExit"
               files="Exit.java"/>
     <suppress checks="ClassFanOutComplexity"
-              
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
+              
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="NPath"
@@ -67,8 +67,6 @@
               files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
     <suppress checks="ParameterNumber"
               files="(KafkaConsumer|ConsumerCoordinator).java"/>
-    <suppress checks="ParameterNumber"
-              files="Fetcher.java"/>
     <suppress checks="ParameterNumber"
               files="Sender.java"/>
     <suppress checks="ParameterNumber"
@@ -79,7 +77,7 @@
               files="MemoryRecordsBuilder.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
+              
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
@@ -87,13 +85,13 @@
               
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+              
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
 
     <suppress checks="JavaNCSS"
               
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/>
+              
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/>
 
     <suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
               files="CoordinatorClient.java"/>
@@ -108,7 +106,7 @@
 
     <!-- Clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
+              
files="(Sender|Fetcher|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
@@ -117,7 +115,7 @@
               files="MockAdminClient.java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="RequestResponseTest.java"/>
+              files="(OffsetFetcher|RequestResponse)Test.java"/>
 
     <suppress checks="JavaNCSS"
               
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index d88aa0a6a1c..ee190df50e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -74,6 +74,7 @@ public class CommonClientConfigs {
 
     public static final String CLIENT_RACK_CONFIG = "client.rack";
     public static final String CLIENT_RACK_DOC = "A rack identifier for this 
client. This can be any string value which indicates where this client is 
physically located. It corresponds with the broker config 'broker.rack'";
+    public static final String DEFAULT_CLIENT_RACK = "";
 
     public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
     public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. This avoids 
repeatedly connecting to a host in a tight loop. This backoff applies to all 
connection attempts by the client to a broker.";
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index f8bc97ec8a3..51c5a35bcf8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -88,6 +88,7 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of 
records returned in a single call to poll()."
         + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not 
impact the underlying fetching behavior."
         + " The consumer will cache the records from each fetch request and 
returns them incrementally from each poll.";
+    public static final int DEFAULT_MAX_POLL_RECORDS = 500;
 
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = 
CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
@@ -154,7 +155,8 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>fetch.min.bytes</code>
      */
     public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
-    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of 1 byte means that fetch requests 
are answered as soon as a single byte of data is available or the fetch request 
times out waiting for data to arrive. Setting this to something greater than 1 
will cause the server to wait  [...]
+    public static final int DEFAULT_FETCH_MIN_BYTES = 1;
+    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of " + DEFAULT_FETCH_MIN_BYTES + " 
byte means that fetch requests are answered as soon as that many byte(s) of 
data is available or the fetch request times out waiting for data to arrive. 
Setting this to a larger value will c [...]
 
     /**
      * <code>fetch.max.bytes</code>
@@ -172,6 +174,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
     private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of 
time the server will block before answering the fetch request if there isn't 
sufficient data to immediately satisfy the requirement given by 
fetch.min.bytes.";
+    public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
 
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
@@ -203,6 +206,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>client.rack</code>
      */
     public static final String CLIENT_RACK_CONFIG = 
CommonClientConfigs.CLIENT_RACK_CONFIG;
+    public static final String DEFAULT_CLIENT_RACK = 
CommonClientConfigs.DEFAULT_CLIENT_RACK;
 
     /**
      * <code>reconnect.backoff.ms</code>
@@ -402,7 +406,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(CLIENT_RACK_CONFIG,
                                         Type.STRING,
-                                        "",
+                                        DEFAULT_CLIENT_RACK,
                                         Importance.LOW,
                                         CommonClientConfigs.CLIENT_RACK_DOC)
                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
@@ -425,7 +429,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(FETCH_MIN_BYTES_CONFIG,
                                         Type.INT,
-                                        1,
+                                        DEFAULT_FETCH_MIN_BYTES,
                                         atLeast(0),
                                         Importance.HIGH,
                                         FETCH_MIN_BYTES_DOC)
@@ -437,7 +441,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         FETCH_MAX_BYTES_DOC)
                                 .define(FETCH_MAX_WAIT_MS_CONFIG,
                                         Type.INT,
-                                        500,
+                                        DEFAULT_FETCH_MAX_WAIT_MS,
                                         atLeast(0),
                                         Importance.LOW,
                                         FETCH_MAX_WAIT_MS_DOC)
@@ -543,7 +547,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         INTERCEPTOR_CLASSES_DOC)
                                 .define(MAX_POLL_RECORDS_CONFIG,
                                         Type.INT,
-                                        500,
+                                        DEFAULT_MAX_POLL_RECORDS,
                                         atLeast(1),
                                         Importance.MEDIUM,
                                         MAX_POLL_RECORDS_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 11ac675cb4b..723cc00ca2f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -27,6 +27,8 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetch;
+import org.apache.kafka.clients.consumer.internals.FetchConfig;
+import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
 import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
@@ -590,7 +592,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final long requestTimeoutMs;
     private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
-    private List<ConsumerPartitionAssignor> assignors;
+    private final List<ConsumerPartitionAssignor> assignors;
 
     // currentThread holds the threadId of the current thread accessing 
KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -738,10 +740,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             String metricGrpPrefix = "consumer";
 
             FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), 
metricGrpPrefix);
+            FetchMetricsManager fetchMetricsManager = new 
FetchMetricsManager(metrics, metricsRegistry);
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config, time, logContext);
             this.isolationLevel = IsolationLevel.valueOf(
                     
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
-            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, 
metricsRegistry);
             int heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
 
             ApiVersions apiVersions = new ApiVersions();
@@ -760,7 +762,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     time,
                     true,
                     apiVersions,
-                    throttleTimeSensor,
+                    fetchMetricsManager.throttleTimeSensor(),
                     logContext);
             this.client = new ConsumerNetworkClient(
                     logContext,
@@ -797,24 +799,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                         
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
                         config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
             }
+            FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, 
keyDeserializer, valueDeserializer, isolationLevel);
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
-                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
-                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
-                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
-                    
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
-                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
-                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
-                    config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
-                    this.keyDeserializer,
-                    this.valueDeserializer,
                     this.metadata,
                     this.subscriptions,
-                    metrics,
-                    metricsRegistry,
-                    this.time,
-                    isolationLevel);
+                    fetchConfig,
+                    fetchMetricsManager,
+                    this.time);
             this.offsetFetcher = new OffsetFetcher(logContext,
                     client,
                     metadata,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
new file mode 100644
index 00000000000..13a45fc4569
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param <K> Type for the message key
+ * @param <V> Type for the message value
+ */
+public abstract class AbstractFetch<K, V> implements Closeable {
+
+    private final Logger log;
+    protected final LogContext logContext;
+    protected final ConsumerNetworkClient client;
+    protected final ConsumerMetadata metadata;
+    protected final SubscriptionState subscriptions;
+    protected final FetchConfig<K, V> fetchConfig;
+    protected final Time time;
+    protected final FetchMetricsManager metricsManager;
+
+    private final BufferSupplier decompressionBufferSupplier;
+    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
+    private final Map<Integer, FetchSessionHandler> sessionHandlers;
+    private final Set<Integer> nodesWithPendingFetchRequests;
+
+    private CompletedFetch<K, V> nextInLineFetch;
+
+    public AbstractFetch(final LogContext logContext,
+                         final ConsumerNetworkClient client,
+                         final ConsumerMetadata metadata,
+                         final SubscriptionState subscriptions,
+                         final FetchConfig<K, V> fetchConfig,
+                         final FetchMetricsManager metricsManager,
+                         final Time time) {
+        this.log = logContext.logger(AbstractFetch.class);
+        this.logContext = logContext;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.decompressionBufferSupplier = BufferSupplier.create();
+        this.completedFetches = new ConcurrentLinkedQueue<>();
+        this.sessionHandlers = new HashMap<>();
+        this.nodesWithPendingFetchRequests = new HashSet<>();
+        this.metricsManager = metricsManager;
+        this.time = time;
+    }
+
+    /**
+     * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+     * visibility for testing.
+     *
+     * @return true if there are completed fetches, false otherwise
+     */
+    boolean hasCompletedFetches() {
+        return !completedFetches.isEmpty();
+    }
+
+    /**
+     * Return whether we have any completed fetches that are fetchable. This 
method is thread-safe.
+     * @return true if there are completed fetches that can be returned, false 
otherwise
+     */
+    public boolean hasAvailableFetches() {
+        return completedFetches.stream().anyMatch(fetch -> 
subscriptions.isFetchable(fetch.partition));
+    }
+
+    /**
+     * Implements the core logic for a successful fetch request/response.
+     *
+     * @param fetchTarget {@link Node} from which the fetch data was requested
+     * @param data {@link FetchSessionHandler.FetchRequestData} that 
represents the session data
+     * @param resp {@link ClientResponse} from which the {@link FetchResponse} 
will be retrieved
+     */
+    protected void handleFetchResponse(final Node fetchTarget,
+                                       final 
FetchSessionHandler.FetchRequestData data,
+                                       final ClientResponse resp) {
+        try {
+            final FetchResponse response = (FetchResponse) resp.responseBody();
+            final FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
+
+            if (handler == null) {
+                log.error("Unable to find FetchSessionHandler for node {}. 
Ignoring fetch response.",
+                        fetchTarget.id());
+                return;
+            }
+
+            final short requestVersion = resp.requestHeader().apiVersion();
+
+            if (!handler.handleResponse(response, requestVersion)) {
+                if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
+                    metadata.requestUpdate();
+                }
+
+                return;
+            }
+
+            final Map<TopicPartition, FetchResponseData.PartitionData> 
responseData = response.responseData(handler.sessionTopicNames(), 
requestVersion);
+            final Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
+            final FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
+
+            for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition partition = entry.getKey();
+                FetchRequest.PartitionData requestData = 
data.sessionPartitions().get(partition);
+
+                if (requestData == null) {
+                    String message;
+
+                    if (data.metadata().isFull()) {
+                        message = MessageFormatter.arrayFormat(
+                                "Response for missing full request partition: 
partition={}; metadata={}",
+                                new Object[]{partition, 
data.metadata()}).getMessage();
+                    } else {
+                        message = MessageFormatter.arrayFormat(
+                                "Response for missing session request 
partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}",
+                                new Object[]{partition, data.metadata(), 
data.toSend(), data.toForget(), data.toReplace()}).getMessage();
+                    }
+
+                    // Received fetch response for missing session partition
+                    throw new IllegalStateException(message);
+                }
+
+                long fetchOffset = requestData.fetchOffset;
+                FetchResponseData.PartitionData partitionData = 
entry.getValue();
+
+                log.debug("Fetch {} at offset {} for partition {} returned 
fetch data {}",
+                        fetchConfig.isolationLevel, fetchOffset, partition, 
partitionData);
+
+                CompletedFetch<K, V> completedFetch = new CompletedFetch<>(
+                        logContext,
+                        subscriptions,
+                        fetchConfig,
+                        decompressionBufferSupplier,
+                        partition,
+                        partitionData,
+                        metricAggregator,
+                        fetchOffset,
+                        requestVersion);
+                completedFetches.add(completedFetch);
+            }
+
+            metricsManager.recordLatency(resp.requestLatencyMs());
+        } finally {
+            log.debug("Removing pending request for node {}", fetchTarget);
+            nodesWithPendingFetchRequests.remove(fetchTarget.id());
+        }
+    }
+
+    /**
+     * Implements the core logic for a failed fetch request/response.
+     *
+     * @param fetchTarget {@link Node} from which the fetch data was requested
+     * @param e {@link RuntimeException} representing the error that resulted 
in the failure
+     */
+    protected void handleFetchResponse(final Node fetchTarget, final 
RuntimeException e) {
+        try {
+            final FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
+
+            if (handler != null) {
+                handler.handleError(e);
+                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
+            }
+        } finally {
+            log.debug("Removing pending request for node {}", fetchTarget);
+            nodesWithPendingFetchRequests.remove(fetchTarget.id());
+        }
+    }
+
+    /**
+     * Creates a new {@link FetchRequest fetch request} in preparation for 
sending to the Kafka cluster.
+     *
+     * @param fetchTarget {@link Node} from which the fetch data will be 
requested
+     * @param requestData {@link FetchSessionHandler.FetchRequestData} that 
represents the session data
+     * @return {@link FetchRequest.Builder} that can be submitted to the broker
+     */
+    protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
+                                                      final 
FetchSessionHandler.FetchRequestData requestData) {
+        // Version 12 is the maximum version that could be used without topic 
IDs. See FetchRequest.json for schema
+        // changelog.
+        final short maxVersion = requestData.canUseTopicIds() ? 
ApiKeys.FETCH.latestVersion() : (short) 12;
+
+        final FetchRequest.Builder request = FetchRequest.Builder
+                .forConsumer(maxVersion, fetchConfig.maxWaitMs, 
fetchConfig.minBytes, requestData.toSend())
+                .isolationLevel(fetchConfig.isolationLevel)
+                .setMaxBytes(fetchConfig.maxBytes)
+                .metadata(requestData.metadata())
+                .removed(requestData.toForget())
+                .replaced(requestData.toReplace())
+                .rackId(fetchConfig.clientRackId);
+
+        log.debug("Sending {} {} to broker {}", fetchConfig.isolationLevel, 
requestData, fetchTarget);
+
+        // We add the node to the set of nodes with pending fetch requests 
before adding the
+        // listener because the future may have been fulfilled on another 
thread (e.g. during a
+        // disconnection being handled by the heartbeat thread) which will 
mean the listener
+        // will be invoked synchronously.
+        log.debug("Adding pending request for node {}", fetchTarget);
+        nodesWithPendingFetchRequests.add(fetchTarget.id());
+
+        return request;
+    }
+
+    /**
+     * Return the fetched records, empty the record buffer and update the 
consumed position.
+     *
+     * </p>
+     *
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.
+     *
+     * @return A {@link Fetch} for the requested partitions
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
+     *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
+     */
+    public Fetch<K, V> collectFetch() {
+        Fetch<K, V> fetch = Fetch.empty();
+        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new 
ArrayDeque<>();
+        int recordsRemaining = fetchConfig.maxPollRecords;
+
+        try {
+            while (recordsRemaining > 0) {
+                if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
+                    CompletedFetch<K, V> records = completedFetches.peek();
+                    if (records == null) break;
+
+                    if (!records.initialized) {
+                        try {
+                            nextInLineFetch = 
initializeCompletedFetch(records);
+                        } catch (Exception e) {
+                            // Remove a completedFetch upon a parse with 
exception if (1) it contains no records, and
+                            // (2) there are no fetched records with actual 
content preceding this exception.
+                            // The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
+                            // in cases such as the 
TopicAuthorizationException, and the second condition ensures that no
+                            // potential data loss due to an exception in a 
following record.
+                            if (fetch.isEmpty() && 
FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) {
+                                completedFetches.poll();
+                            }
+                            throw e;
+                        }
+                    } else {
+                        nextInLineFetch = records;
+                    }
+                    completedFetches.poll();
+                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
+                    // when the partition is paused we add the records back to 
the completedFetches queue instead of draining
+                    // them so that they can be returned on a subsequent poll 
if the partition is resumed at that time
+                    log.debug("Skipping fetching records for assigned 
partition {} because it is paused", nextInLineFetch.partition);
+                    pausedCompletedFetches.add(nextInLineFetch);
+                    nextInLineFetch = null;
+                } else {
+                    Fetch<K, V> nextFetch = fetchRecords(recordsRemaining);
+                    recordsRemaining -= nextFetch.numRecords();
+                    fetch.add(nextFetch);
+                }
+            }
+        } catch (KafkaException e) {
+            if (fetch.isEmpty())
+                throw e;
+        } finally {
+            // add any polled completed fetches for paused partitions back to 
the completed fetches queue to be
+            // re-evaluated in the next poll
+            completedFetches.addAll(pausedCompletedFetches);
+        }
+
+        return fetch;
+    }
+
+    private Fetch<K, V> fetchRecords(final int maxRecords) {
+        if (!subscriptions.isAssigned(nextInLineFetch.partition)) {
+            // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
+            log.debug("Not returning fetched records for partition {} since it 
is no longer assigned",
+                    nextInLineFetch.partition);
+        } else if (!subscriptions.isFetchable(nextInLineFetch.partition)) {
+            // this can happen when a partition is paused before fetched 
records are returned to the consumer's
+            // poll call or if the offset is being reset
+            log.debug("Not returning fetched records for assigned partition {} 
since it is no longer fetchable",
+                    nextInLineFetch.partition);
+        } else {
+            SubscriptionState.FetchPosition position = 
subscriptions.position(nextInLineFetch.partition);
+            if (position == null) {
+                throw new IllegalStateException("Missing position for 
fetchable partition " + nextInLineFetch.partition);
+            }
+
+            if (nextInLineFetch.nextFetchOffset == position.offset) {
+                List<ConsumerRecord<K, V>> partRecords = 
nextInLineFetch.fetchRecords(maxRecords);
+
+                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
+                        partRecords.size(), position, 
nextInLineFetch.partition);
+
+                boolean positionAdvanced = false;
+
+                if (nextInLineFetch.nextFetchOffset > position.offset) {
+                    SubscriptionState.FetchPosition nextPosition = new 
SubscriptionState.FetchPosition(
+                            nextInLineFetch.nextFetchOffset,
+                            nextInLineFetch.lastEpoch,
+                            position.currentLeader);
+                    log.trace("Updating fetch position from {} to {} for 
partition {} and returning {} records from `poll()`",
+                            position, nextPosition, nextInLineFetch.partition, 
partRecords.size());
+                    subscriptions.position(nextInLineFetch.partition, 
nextPosition);
+                    positionAdvanced = true;
+                }
+
+                Long partitionLag = 
subscriptions.partitionLag(nextInLineFetch.partition, 
fetchConfig.isolationLevel);
+                if (partitionLag != null)
+                    
metricsManager.recordPartitionLag(nextInLineFetch.partition, partitionLag);
+
+                Long lead = 
subscriptions.partitionLead(nextInLineFetch.partition);
+                if (lead != null) {
+                    
metricsManager.recordPartitionLead(nextInLineFetch.partition, lead);
+                }
+
+                return Fetch.forPartition(nextInLineFetch.partition, 
partRecords, positionAdvanced);
+            } else {
+                // these records aren't next in line based on the last 
consumed position, ignore them
+                // they must be from an obsolete request
+                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
+                        nextInLineFetch.partition, 
nextInLineFetch.nextFetchOffset, position);
+            }
+        }
+
+        log.trace("Draining fetched records for partition {}", 
nextInLineFetch.partition);
+        nextInLineFetch.drain();
+
+        return Fetch.empty();
+    }
+
+    private List<TopicPartition> fetchablePartitions() {
+        Set<TopicPartition> exclude = new HashSet<>();
+        if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
+            exclude.add(nextInLineFetch.partition);
+        }
+        for (CompletedFetch<K, V> completedFetch : completedFetches) {
+            exclude.add(completedFetch.partition);
+        }
+        return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
+    }
+
+    /**
+     * Determine from which replica to read: the <i>preferred</i> or the 
<i>leader</i>. The preferred replica is used
+     * iff:
+     *
+     * <ul>
+     *     <li>A preferred replica was previously set</li>
+     *     <li>We're still within the lease time for the preferred replica</li>
+     *     <li>The replica is still online/available</li>
+     * </ul>
+     *
+     * If any of the above are not met, the leader node is returned.
+     *
+     * @param partition {@link TopicPartition} for which we want to fetch data
+     * @param leaderReplica {@link Node} for the leader of the given partition
+     * @param currentTimeMs Current time in milliseconds; used to determine if 
we're within the optional lease window
+     * @return Replic {@link Node node} from which to request the data
+     * @see SubscriptionState#preferredReadReplica
+     * @see SubscriptionState#updatePreferredReadReplica
+     */
+    Node selectReadReplica(final TopicPartition partition, final Node 
leaderReplica, final long currentTimeMs) {
+        Optional<Integer> nodeId = 
subscriptions.preferredReadReplica(partition, currentTimeMs);
+
+        if (nodeId.isPresent()) {
+            Optional<Node> node = nodeId.flatMap(id -> 
metadata.fetch().nodeIfOnline(partition, id));
+            if (node.isPresent()) {
+                return node.get();
+            } else {
+                log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
+                        " using the leader instead.", nodeId, partition);
+                // Note that this condition may happen due to stale metadata, 
so we clear preferred replica and
+                // refresh metadata.
+                requestMetadataUpdate(partition);
+                return leaderReplica;
+            }
+        } else {
+            return leaderReplica;
+        }
+    }
+
+    /**
+     * Create fetch requests for all nodes for which we have assigned 
partitions
+     * that have no existing requests in flight.
+     */
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
+        // Update metrics in case there was an assignment change
+        metricsManager.maybeUpdateAssignment(subscriptions);
+
+        Map<Node, FetchSessionHandler.Builder> fetchable = new 
LinkedHashMap<>();
+        long currentTimeMs = time.milliseconds();
+        Map<String, Uuid> topicIds = metadata.topicIds();
+
+        for (TopicPartition partition : fetchablePartitions()) {
+            SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+
+            if (position == null)
+                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
+
+            Optional<Node> leaderOpt = position.currentLeader.leader;
+
+            if (!leaderOpt.isPresent()) {
+                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
+                metadata.requestUpdate();
+                continue;
+            }
+
+            // Use the preferred read replica if set, otherwise the 
partition's leader
+            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
+
+            if (client.isUnavailable(node)) {
+                client.maybeThrowAuthFailure(node);
+
+                // If we try to send during the reconnect backoff window, then 
the request is just
+                // going to be failed anyway before being sent, so skip 
sending the request for now
+                log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
+            } else if (nodesWithPendingFetchRequests.contains(node.id())) {
+                log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+            } else {
+                // if there is a leader and no in-flight requests, issue a new 
fetch
+                FetchSessionHandler.Builder builder = 
fetchable.computeIfAbsent(node, k -> {
+                    FetchSessionHandler fetchSessionHandler = 
sessionHandlers.computeIfAbsent(node.id(), n -> new 
FetchSessionHandler(logContext, n));
+                    return fetchSessionHandler.newBuilder();
+                });
+                Uuid topicId = topicIds.getOrDefault(partition.topic(), 
Uuid.ZERO_UUID);
+                FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId,
+                        position.offset,
+                        FetchRequest.INVALID_LOG_START_OFFSET,
+                        fetchConfig.fetchSize,
+                        position.currentLeader.epoch,
+                        Optional.empty());
+                builder.add(partition, partitionData);
+
+                log.debug("Added {} fetch request for partition {} at position 
{} to node {}", fetchConfig.isolationLevel,
+                        partition, position, node);
+            }
+        }
+
+        Map<Node, FetchSessionHandler.FetchRequestData> reqs = new 
LinkedHashMap<>();
+        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : 
fetchable.entrySet()) {
+            reqs.put(entry.getKey(), entry.getValue().build());
+        }
+        return reqs;
+    }
+
+    /**
+     * Initialize a CompletedFetch object.
+     */
+    private CompletedFetch<K, V> initializeCompletedFetch(final 
CompletedFetch<K, V> completedFetch) {
+        final TopicPartition tp = completedFetch.partition;
+        final Errors error = 
Errors.forCode(completedFetch.partitionData.errorCode());
+        boolean recordMetrics = true;
+
+        try {
+            if (!subscriptions.hasValidPosition(tp)) {
+                // this can happen when a rebalance happened while fetch is 
still in-flight
+                log.debug("Ignoring fetched records for partition {} since it 
no longer has valid position", tp);
+                return null;
+            } else if (error == Errors.NONE) {
+                final CompletedFetch<K, V> ret = 
handleInitializeCompletedFetchSuccess(completedFetch);
+                recordMetrics = ret == null;
+                return ret;
+            } else {
+                handleInitializeCompletedFetchErrors(completedFetch, error);
+                return null;
+            }
+        } finally {
+            if (recordMetrics) {
+                completedFetch.recordAggregatedMetrics(0, 0);
+            }
+
+            if (error != Errors.NONE)
+                // we move the partition to the end if there was an error. 
This way, it's more likely that partitions for
+                // the same topic can remain together (allowing for more 
efficient serialization).
+                subscriptions.movePartitionToEnd(tp);
+        }
+    }
+
+    private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final 
CompletedFetch<K, V> completedFetch) {
+        final TopicPartition tp = completedFetch.partition;
+        final long fetchOffset = completedFetch.nextFetchOffset;
+
+        // we are interested in this fetch only if the beginning offset 
matches the
+        // current consumed position
+        SubscriptionState.FetchPosition position = subscriptions.position(tp);
+        if (position == null || position.offset != fetchOffset) {
+            log.debug("Discarding stale fetch response for partition {} since 
its offset {} does not match " +
+                    "the expected offset {}", tp, fetchOffset, position);
+            return null;
+        }
+
+        final FetchResponseData.PartitionData partition = 
completedFetch.partitionData;
+        log.trace("Preparing to read {} bytes of data for partition {} with 
offset {}",
+                FetchResponse.recordsSize(partition), tp, position);
+        Iterator<? extends RecordBatch> batches = 
FetchResponse.recordsOrFail(partition).batches().iterator();
+
+        if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
+            if (completedFetch.requestVersion < 3) {
+                // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
+                Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
+                throw new RecordTooLargeException("There are some messages at 
[Partition=Offset]: " +
+                        recordTooLargePartitions + " whose size is larger than 
the fetch size " + fetchConfig.fetchSize +
+                        " and hence cannot be returned. Please considering 
upgrading your broker to 0.10.1.0 or " +
+                        "newer to avoid this issue. Alternately, increase the 
fetch size on the client (using " +
+                        ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
+                        recordTooLargePartitions);
+            } else {
+                // This should not happen with brokers that support 
FetchRequest/Response V3 or higher (i.e. KIP-74)
+                throw new KafkaException("Failed to make progress reading 
messages at " + tp + "=" +
+                        fetchOffset + ". Received a non-empty fetch response 
from the server, but no " +
+                        "complete records were found.");
+            }
+        }
+
+        if (partition.highWatermark() >= 0) {
+            log.trace("Updating high watermark for partition {} to {}", tp, 
partition.highWatermark());
+            subscriptions.updateHighWatermark(tp, partition.highWatermark());
+        }
+
+        if (partition.logStartOffset() >= 0) {
+            log.trace("Updating log start offset for partition {} to {}", tp, 
partition.logStartOffset());
+            subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
+        }
+
+        if (partition.lastStableOffset() >= 0) {
+            log.trace("Updating last stable offset for partition {} to {}", 
tp, partition.lastStableOffset());
+            subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
+        }
+
+        if (FetchResponse.isPreferredReplica(partition)) {
+            subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica(), () -> {
+                long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
+                log.debug("Updating preferred read replica for partition {} to 
{}, set to expire at {}",
+                        tp, partition.preferredReadReplica(), expireTimeMs);
+                return expireTimeMs;
+            });
+        }
+
+        completedFetch.initialized = true;
+        return completedFetch;
+    }
+
+    private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, 
V> completedFetch,
+                                                      final Errors error) {
+        final TopicPartition tp = completedFetch.partition;
+        final long fetchOffset = completedFetch.nextFetchOffset;
+
+        if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
+                error == Errors.REPLICA_NOT_AVAILABLE ||
+                error == Errors.KAFKA_STORAGE_ERROR ||
+                error == Errors.FENCED_LEADER_EPOCH ||
+                error == Errors.OFFSET_NOT_AVAILABLE) {
+            log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
+            requestMetadataUpdate(tp);
+        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+            log.warn("Received unknown topic or partition error in fetch for 
partition {}", tp);
+            requestMetadataUpdate(tp);
+        } else if (error == Errors.UNKNOWN_TOPIC_ID) {
+            log.warn("Received unknown topic ID error in fetch for partition 
{}", tp);
+            requestMetadataUpdate(tp);
+        } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
+            log.warn("Received inconsistent topic ID error in fetch for 
partition {}", tp);
+            requestMetadataUpdate(tp);
+        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
+            Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
+
+            if (!clearedReplicaId.isPresent()) {
+                // If there's no preferred replica to clear, we're fetching 
from the leader so handle this error normally
+                SubscriptionState.FetchPosition position = 
subscriptions.position(tp);
+
+                if (position == null || fetchOffset != position.offset) {
+                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
+                            "does not match the current offset {}", tp, 
fetchOffset, position);
+                } else {
+                    handleOffsetOutOfRange(position, tp);
+                }
+            } else {
+                log.debug("Unset the preferred read replica {} for partition 
{} since we got {} when fetching {}",
+                        clearedReplicaId.get(), tp, error, fetchOffset);
+            }
+        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+            //we log the actual partition and not just the topic to help with 
ACL propagation issues in large clusters
+            log.warn("Not authorized to read from partition {}.", tp);
+            throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
+        } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
+            log.debug("Received unknown leader epoch error in fetch for 
partition {}", tp);
+        } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
+            log.warn("Unknown server error while fetching offset {} for 
topic-partition {}",
+                    fetchOffset, tp);
+        } else if (error == Errors.CORRUPT_MESSAGE) {
+            throw new KafkaException("Encountered corrupt message when 
fetching offset "
+                    + fetchOffset
+                    + " for topic-partition "
+                    + tp);
+        } else {
+            throw new IllegalStateException("Unexpected error code "
+                    + error.code()
+                    + " while fetching at offset "
+                    + fetchOffset
+                    + " from topic-partition " + tp);
+        }
+    }
+
+    private void handleOffsetOutOfRange(final SubscriptionState.FetchPosition 
fetchPosition,
+                                        final TopicPartition topicPartition) {
+        String errorMessage = "Fetch position " + fetchPosition + " is out of 
range for partition " + topicPartition;
+
+        if (subscriptions.hasDefaultOffsetResetPolicy()) {
+            log.info("{}, resetting offset", errorMessage);
+            subscriptions.requestOffsetReset(topicPartition);
+        } else {
+            log.info("{}, raising error to the application since no reset 
policy is configured", errorMessage);
+            throw new OffsetOutOfRangeException(errorMessage,
+                    Collections.singletonMap(topicPartition, 
fetchPosition.offset));
+        }
+    }
+
+    /**
+     * Clear the buffered data which are not a part of newly assigned 
partitions. Any previously
+     * {@link CompletedFetch fetched data} is dropped if it is for a partition 
that is no longer in
+     * {@code assignedPartitions}.
+     *
+     * @param assignedPartitions Newly-assigned {@link TopicPartition}
+     */
+    public void clearBufferedDataForUnassignedPartitions(final 
Collection<TopicPartition> assignedPartitions) {
+        final Iterator<CompletedFetch<K, V>> completedFetchesItr = 
completedFetches.iterator();
+
+        while (completedFetchesItr.hasNext()) {
+            final CompletedFetch<K, V> completedFetch = 
completedFetchesItr.next();
+            final TopicPartition tp = completedFetch.partition;
+
+            if (!assignedPartitions.contains(tp)) {
+                log.debug("Removing {} from buffered data as it is no longer 
an assigned partition", tp);
+                completedFetch.drain();
+                completedFetchesItr.remove();
+            }
+        }
+
+        if (nextInLineFetch != null && 
!assignedPartitions.contains(nextInLineFetch.partition)) {
+            nextInLineFetch.drain();
+            nextInLineFetch = null;
+        }
+    }
+
+    /**
+     * Clear the buffered data which are not a part of newly assigned topics
+     *
+     * @param assignedTopics  newly assigned topics
+     */
+    public void clearBufferedDataForUnassignedTopics(Collection<String> 
assignedTopics) {
+        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (assignedTopics.contains(tp.topic())) {
+                currentTopicPartitions.add(tp);
+            }
+        }
+
+        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+    }
+
+    protected FetchSessionHandler sessionHandler(int node) {
+        return sessionHandlers.get(node);
+    }
+
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+
+        sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+                return;
+            }
+
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, sessionHandler.newBuilder().build());
+            final RequestFuture<ClientResponse> responseFuture = 
client.send(fetchTarget, request);
+
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+                            "This may result in unnecessary fetch sessions at 
the broker.", sessionId, fetchTarget, e);
+                }
+            });
+
+            requestFutures.add(responseFuture);
+        });
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            client.poll(timer, null, true);
+        }
+
+        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            // we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+            // here.
+            log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+                    "This may result in unnecessary fetch sessions at the 
broker. Consider increasing the timeout passed for " +
+                    "KafkaConsumer.close(Duration timeout)", 
timer.timeoutMs());
+        }
+    }
+
+    public void close(final Timer timer) {
+        // we do not need to re-enable wakeups since we are closing already
+        client.disableWakeups();
+
+        if (nextInLineFetch != null) {
+            nextInLineFetch.drain();
+            nextInLineFetch = null;
+        }
+
+        maybeCloseFetchSessions(timer);
+        Utils.closeQuietly(decompressionBufferSupplier, 
"decompressionBufferSupplier");
+        sessionHandlers.clear();
+    }
+
+    @Override
+    public void close() {
+        close(time.timer(0));
+    }
+
+    private void requestMetadataUpdate(final TopicPartition topicPartition) {
+        metadata.requestUpdate();
+        subscriptions.clearPreferredReadReplica(topicPartition);
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 6a11b846810..a67516ecc73 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -71,11 +71,8 @@ class CompletedFetch<K, V> {
 
     private final Logger log;
     private final SubscriptionState subscriptions;
-    private final boolean checkCrcs;
+    private final FetchConfig<K, V> fetchConfig;
     private final BufferSupplier decompressionBufferSupplier;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-    private final IsolationLevel isolationLevel;
     private final Iterator<? extends RecordBatch> batches;
     private final Set<Long> abortedProducerIds;
     private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
@@ -91,11 +88,8 @@ class CompletedFetch<K, V> {
 
     CompletedFetch(LogContext logContext,
                    SubscriptionState subscriptions,
-                   boolean checkCrcs,
+                   FetchConfig<K, V> fetchConfig,
                    BufferSupplier decompressionBufferSupplier,
-                   Deserializer<K> keyDeserializer,
-                   Deserializer<V> valueDeserializer,
-                   IsolationLevel isolationLevel,
                    TopicPartition partition,
                    FetchResponseData.PartitionData partitionData,
                    FetchMetricsAggregator metricAggregator,
@@ -103,11 +97,8 @@ class CompletedFetch<K, V> {
                    short requestVersion) {
         this.log = logContext.logger(CompletedFetch.class);
         this.subscriptions = subscriptions;
-        this.checkCrcs = checkCrcs;
+        this.fetchConfig = fetchConfig;
         this.decompressionBufferSupplier = decompressionBufferSupplier;
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
-        this.isolationLevel = isolationLevel;
         this.partition = partition;
         this.partitionData = partitionData;
         this.metricAggregator = metricAggregator;
@@ -147,7 +138,7 @@ class CompletedFetch<K, V> {
     }
 
     private void maybeEnsureValid(RecordBatch batch) {
-        if (checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+        if (fetchConfig.checkCrcs && batch.magic() >= 
RecordBatch.MAGIC_VALUE_V2) {
             try {
                 batch.ensureValid();
             } catch (CorruptRecordException e) {
@@ -158,7 +149,7 @@ class CompletedFetch<K, V> {
     }
 
     private void maybeEnsureValid(Record record) {
-        if (checkCrcs) {
+        if (fetchConfig.checkCrcs) {
             try {
                 record.ensureValid();
             } catch (CorruptRecordException e) {
@@ -196,7 +187,7 @@ class CompletedFetch<K, V> {
                 lastEpoch = 
maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
                 maybeEnsureValid(currentBatch);
 
-                if (isolationLevel == IsolationLevel.READ_COMMITTED && 
currentBatch.hasProducerId()) {
+                if (fetchConfig.isolationLevel == 
IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                     // remove from the aborted transaction queue all aborted 
transactions which have begun
                     // before the current batch's last offset and add the 
associated producerIds to the
                     // aborted producer set
@@ -306,10 +297,10 @@ class CompletedFetch<K, V> {
             Headers headers = new RecordHeaders(record.headers());
             ByteBuffer keyBytes = record.key();
             byte[] keyByteArray = keyBytes == null ? null : 
org.apache.kafka.common.utils.Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            K key = keyBytes == null ? null : 
fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, 
keyByteArray);
             ByteBuffer valueBytes = record.value();
             byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            V value = valueBytes == null ? null : 
fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, 
valueByteArray);
             return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
                     timestamp, timestampType,
                     keyByteArray == null ? ConsumerRecord.NULL_SIZE : 
keyByteArray.length,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java
new file mode 100644
index 00000000000..1557ce980b4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * {@link FetchConfig} represents the static configuration for fetching 
records from Kafka. It is simply a way
+ * to bundle the immutable settings that were presented at the time the {@link 
Consumer} was created for later use by
+ * classes like {@link Fetcher}, {@link CompletedFetch}, etc.
+ *
+ * <p/>
+ *
+ * In most cases, the values stored and returned by {@link FetchConfig} will 
be those stored in the following
+ * {@link ConsumerConfig consumer configuration} settings:
+ *
+ * <ul>
+ *     <li>{@link #minBytes}: {@link 
ConsumerConfig#FETCH_MIN_BYTES_CONFIG}</li>
+ *     <li>{@link #maxBytes}: {@link 
ConsumerConfig#FETCH_MAX_BYTES_CONFIG}</li>
+ *     <li>{@link #maxWaitMs}: {@link 
ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}</li>
+ *     <li>{@link #fetchSize}: {@link 
ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG}</li>
+ *     <li>{@link #maxPollRecords}: {@link 
ConsumerConfig#MAX_POLL_RECORDS_CONFIG}</li>
+ *     <li>{@link #checkCrcs}: {@link ConsumerConfig#CHECK_CRCS_CONFIG}</li>
+ *     <li>{@link #clientRackId}: {@link 
ConsumerConfig#CLIENT_RACK_CONFIG}</li>
+ *     <li>{@link #keyDeserializer}: {@link 
ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}</li>
+ *     <li>{@link #valueDeserializer}: {@link 
ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}</li>
+ *     <li>{@link #isolationLevel}: {@link 
ConsumerConfig#ISOLATION_LEVEL_CONFIG}</li>
+ * </ul>
+ *
+ * However, there are places in the code where additional logic is used to 
determine these fetch-related configuration
+ * values. In those cases, the values are calculated outside of this class and 
simply passed in when constructed.
+ *
+ * <p/>
+ *
+ * Note: the {@link Deserializer deserializers} used for the key and value are 
not closed by this class. They should be
+ * closed by the creator of the {@link FetchConfig}.
+ *
+ * @param <K> Type used to {@link Deserializer deserialize} the message/record 
key
+ * @param <V> Type used to {@link Deserializer deserialize} the message/record 
value
+ */
+public class FetchConfig<K, V> {
+
+    final int minBytes;
+    final int maxBytes;
+    final int maxWaitMs;
+    final int fetchSize;
+    final int maxPollRecords;
+    final boolean checkCrcs;
+    final String clientRackId;
+    final Deserializer<K> keyDeserializer;
+    final Deserializer<V> valueDeserializer;
+    final IsolationLevel isolationLevel;
+
+    public FetchConfig(int minBytes,
+                       int maxBytes,
+                       int maxWaitMs,
+                       int fetchSize,
+                       int maxPollRecords,
+                       boolean checkCrcs,
+                       String clientRackId,
+                       Deserializer<K> keyDeserializer,
+                       Deserializer<V> valueDeserializer,
+                       IsolationLevel isolationLevel) {
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.maxWaitMs = maxWaitMs;
+        this.fetchSize = fetchSize;
+        this.maxPollRecords = maxPollRecords;
+        this.checkCrcs = checkCrcs;
+        this.clientRackId = clientRackId;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+    }
+
+    public FetchConfig(ConsumerConfig config,
+                       Deserializer<K> keyDeserializer,
+                       Deserializer<V> valueDeserializer,
+                       IsolationLevel isolationLevel) {
+        this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+        this.maxBytes = config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG);
+        this.maxWaitMs = 
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+        this.fetchSize = 
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+        this.maxPollRecords = 
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+        this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+        this.clientRackId = 
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG);
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+    }
+
+    @Override
+    public String toString() {
+        return "FetchConfig{" +
+                "minBytes=" + minBytes +
+                ", maxBytes=" + maxBytes +
+                ", maxWaitMs=" + maxWaitMs +
+                ", fetchSize=" + fetchSize +
+                ", maxPollRecords=" + maxPollRecords +
+                ", checkCrcs=" + checkCrcs +
+                ", clientRackId='" + clientRackId + '\'' +
+                ", keyDeserializer=" + keyDeserializer +
+                ", valueDeserializer=" + valueDeserializer +
+                ", isolationLevel=" + isolationLevel +
+                '}';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
index 63bd7650701..49801da3b7c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
@@ -33,10 +33,11 @@ import java.util.Set;
  * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
  * records matches up with the topic-partitions in use.
  */
-class FetchMetricsManager {
+public class FetchMetricsManager {
 
     private final Metrics metrics;
     private final FetchMetricsRegistry metricsRegistry;
+    private final Sensor throttleTime;
     private final Sensor bytesFetched;
     private final Sensor recordsFetched;
     private final Sensor fetchLatency;
@@ -46,10 +47,14 @@ class FetchMetricsManager {
     private int assignmentId = 0;
     private Set<TopicPartition> assignedPartitions = Collections.emptySet();
 
-    FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) 
{
+    public FetchMetricsManager(Metrics metrics, FetchMetricsRegistry 
metricsRegistry) {
         this.metrics = metrics;
         this.metricsRegistry = metricsRegistry;
 
+        this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time")
+                .withAvg(metricsRegistry.fetchThrottleTimeAvg)
+                .withMax(metricsRegistry.fetchThrottleTimeMax)
+                .build();
         this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched")
                 .withAvg(metricsRegistry.fetchSizeAvg)
                 .withMax(metricsRegistry.fetchSizeMax)
@@ -72,6 +77,10 @@ class FetchMetricsManager {
                 .build();
     }
 
+    public Sensor throttleTimeSensor() {
+        return throttleTime;
+    }
+
     void recordLatency(long requestLatencyMs) {
         fetchLatency.record(requestLatencyMs);
     }
@@ -166,7 +175,7 @@ class FetchMetricsManager {
         }
     }
 
-    static String topicBytesFetchedMetricName(String topic) {
+    private static String topicBytesFetchedMetricName(String topic) {
         return "topic." + topic + ".bytes-fetched";
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 29cd7972cc6..c46f0c53cce 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -18,52 +18,14 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
-import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.helpers.MessageFormatter;
 
-import java.io.Closeable;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -85,85 +47,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *     on a different thread.</li>
  * </ul>
  */
-public class Fetcher<K, V> implements Closeable {
+public class Fetcher<K, V> extends AbstractFetch<K, V> {
+
     private final Logger log;
-    private final LogContext logContext;
-    private final ConsumerNetworkClient client;
-    private final Time time;
-    private final int minBytes;
-    private final int maxBytes;
-    private final int maxWaitMs;
-    private final int fetchSize;
-    private final int maxPollRecords;
-    private final boolean checkCrcs;
-    private final String clientRackId;
-    private final ConsumerMetadata metadata;
-    private final FetchMetricsManager metricsManager;
-    private final SubscriptionState subscriptions;
-    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
-    private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-    private final IsolationLevel isolationLevel;
-    private final Map<Integer, FetchSessionHandler> sessionHandlers;
-    private final Set<Integer> nodesWithPendingFetchRequests;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
-    private CompletedFetch<K, V> nextInLineFetch = null;
 
     public Fetcher(LogContext logContext,
                    ConsumerNetworkClient client,
-                   int minBytes,
-                   int maxBytes,
-                   int maxWaitMs,
-                   int fetchSize,
-                   int maxPollRecords,
-                   boolean checkCrcs,
-                   String clientRackId,
-                   Deserializer<K> keyDeserializer,
-                   Deserializer<V> valueDeserializer,
                    ConsumerMetadata metadata,
                    SubscriptionState subscriptions,
-                   Metrics metrics,
-                   FetchMetricsRegistry metricsRegistry,
-                   Time time,
-                   IsolationLevel isolationLevel) {
+                   FetchConfig<K, V> fetchConfig,
+                   FetchMetricsManager metricsManager,
+                   Time time) {
+        super(logContext, client, metadata, subscriptions, fetchConfig, 
metricsManager, time);
         this.log = logContext.logger(Fetcher.class);
-        this.logContext = logContext;
-        this.time = time;
-        this.client = client;
-        this.metadata = metadata;
-        this.subscriptions = subscriptions;
-        this.minBytes = minBytes;
-        this.maxBytes = maxBytes;
-        this.maxWaitMs = maxWaitMs;
-        this.fetchSize = fetchSize;
-        this.maxPollRecords = maxPollRecords;
-        this.checkCrcs = checkCrcs;
-        this.clientRackId = clientRackId;
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
-        this.completedFetches = new ConcurrentLinkedQueue<>();
-        this.metricsManager = new FetchMetricsManager(metrics, 
metricsRegistry);
-        this.isolationLevel = isolationLevel;
-        this.sessionHandlers = new HashMap<>();
-        this.nodesWithPendingFetchRequests = new HashSet<>();
-    }
-
-    /**
-     * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
-     * visibility for testing.
-     * @return true if there are completed fetches, false otherwise
-     */
-    protected boolean hasCompletedFetches() {
-        return !completedFetches.isEmpty();
-    }
-
-    /**
-     * Return whether we have any completed fetches that are fetchable. This 
method is thread-safe.
-     * @return true if there are completed fetches that can be returned, false 
otherwise
-     */
-    public boolean hasAvailableFetches() {
-        return completedFetches.stream().anyMatch(fetch -> 
subscriptions.isFetchable(fetch.partition));
     }
 
     /**
@@ -172,606 +69,33 @@ public class Fetcher<K, V> implements Closeable {
      * @return number of fetches sent
      */
     public synchronized int sendFetches() {
-        // Update metrics in case there was an assignment change
-        metricsManager.maybeUpdateAssignment(subscriptions);
-
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
+
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
             final FetchSessionHandler.FetchRequestData data = entry.getValue();
-            final RequestFuture<ClientResponse> future = 
sendFetchRequestToNode(data, fetchTarget);
-            // We add the node to the set of nodes with pending fetch requests 
before adding the
-            // listener because the future may have been fulfilled on another 
thread (e.g. during a
-            // disconnection being handled by the heartbeat thread) which will 
mean the listener
-            // will be invoked synchronously.
-            this.nodesWithPendingFetchRequests.add(entry.getKey().id());
-            future.addListener(new RequestFutureListener<ClientResponse>() {
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+            RequestFutureListener<ClientResponse> listener = new 
RequestFutureListener<ClientResponse>() {
                 @Override
                 public void onSuccess(ClientResponse resp) {
                     synchronized (Fetcher.this) {
-                        try {
-                            FetchResponse response = (FetchResponse) 
resp.responseBody();
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler == null) {
-                                log.error("Unable to find FetchSessionHandler 
for node {}. Ignoring fetch response.",
-                                        fetchTarget.id());
-                                return;
-                            }
-                            if (!handler.handleResponse(response, 
resp.requestHeader().apiVersion())) {
-                                if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
-                                    metadata.requestUpdate();
-                                }
-                                return;
-                            }
-
-                            Map<TopicPartition, 
FetchResponseData.PartitionData> responseData = 
response.responseData(handler.sessionTopicNames(), 
resp.requestHeader().apiVersion());
-                            Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
-                            FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
-
-                            for (Map.Entry<TopicPartition, 
FetchResponseData.PartitionData> entry : responseData.entrySet()) {
-                                TopicPartition partition = entry.getKey();
-                                FetchRequest.PartitionData requestData = 
data.sessionPartitions().get(partition);
-                                if (requestData == null) {
-                                    String message;
-                                    if (data.metadata().isFull()) {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing full 
request partition: partition={}; metadata={}",
-                                                new Object[]{partition, 
data.metadata()}).getMessage();
-                                    } else {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing session 
request partition: partition={}; metadata={}; toSend={}; toForget={}; 
toReplace={}",
-                                                new Object[]{partition, 
data.metadata(), data.toSend(), data.toForget(), 
data.toReplace()}).getMessage();
-                                    }
-
-                                    // Received fetch response for missing 
session partition
-                                    throw new IllegalStateException(message);
-                                } else {
-                                    long fetchOffset = requestData.fetchOffset;
-                                    short requestVersion = 
resp.requestHeader().apiVersion();
-                                    FetchResponseData.PartitionData 
partitionData = entry.getValue();
-
-                                    log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
-                                            isolationLevel, fetchOffset, 
partition, partitionData);
-
-                                    CompletedFetch<K, V> completedFetch = new 
CompletedFetch<>(logContext,
-                                            subscriptions,
-                                            checkCrcs,
-                                            decompressionBufferSupplier,
-                                            keyDeserializer,
-                                            valueDeserializer,
-                                            isolationLevel,
-                                            partition,
-                                            partitionData,
-                                            metricAggregator,
-                                            fetchOffset,
-                                            requestVersion);
-                                    completedFetches.add(completedFetch);
-                                }
-                            }
-
-                            
metricsManager.recordLatency(resp.requestLatencyMs());
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
+                        handleFetchResponse(fetchTarget, data, resp);
                     }
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
                     synchronized (Fetcher.this) {
-                        try {
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler != null) {
-                                handler.handleError(e);
-                                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
-                            }
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
+                        handleFetchResponse(fetchTarget, e);
                     }
                 }
-            });
+            };
 
+            final RequestFuture<ClientResponse> future = 
client.send(fetchTarget, request);
+            future.addListener(listener);
         }
-        return fetchRequestMap.size();
-    }
-
-    /**
-     * Send Fetch Request to Kafka cluster asynchronously.
-     *
-     * </p>
-     *
-     * This method is visible for testing.
-     *
-     * @return A future that indicates result of sent Fetch request
-     */
-    private RequestFuture<ClientResponse> sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
-                                                                 final Node 
fetchTarget) {
-        // Version 12 is the maximum version that could be used without topic 
IDs. See FetchRequest.json for schema
-        // changelog.
-        final short maxVersion = requestData.canUseTopicIds() ? 
ApiKeys.FETCH.latestVersion() : (short) 12;
-
-        final FetchRequest.Builder request = FetchRequest.Builder
-                .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
requestData.toSend())
-                .isolationLevel(isolationLevel)
-                .setMaxBytes(this.maxBytes)
-                .metadata(requestData.metadata())
-                .removed(requestData.toForget())
-                .replaced(requestData.toReplace())
-                .rackId(clientRackId);
 
-        log.debug("Sending {} {} to broker {}", isolationLevel, requestData, 
fetchTarget);
-
-        return client.send(fetchTarget, request);
-    }
-
-    /**
-     * Return the fetched records, empty the record buffer and update the 
consumed position.
-     *
-     * </p>
-     *
-     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.
-     *
-     * @return A {@link Fetch} for the requested partitions
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
-     */
-    public Fetch<K, V> collectFetch() {
-        Fetch<K, V> fetch = Fetch.empty();
-        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new 
ArrayDeque<>();
-        int recordsRemaining = maxPollRecords;
-
-        try {
-            while (recordsRemaining > 0) {
-                if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
-                    CompletedFetch<K, V> records = completedFetches.peek();
-                    if (records == null) break;
-
-                    if (!records.initialized) {
-                        try {
-                            nextInLineFetch = 
initializeCompletedFetch(records);
-                        } catch (Exception e) {
-                            // Remove a completedFetch upon a parse with 
exception if (1) it contains no records, and
-                            // (2) there are no fetched records with actual 
content preceding this exception.
-                            // The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
-                            // in cases such as the 
TopicAuthorizationException, and the second condition ensures that no
-                            // potential data loss due to an exception in a 
following record.
-                            FetchResponseData.PartitionData partition = 
records.partitionData;
-                            if (fetch.isEmpty() && 
FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) {
-                                completedFetches.poll();
-                            }
-                            throw e;
-                        }
-                    } else {
-                        nextInLineFetch = records;
-                    }
-                    completedFetches.poll();
-                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
-                    // when the partition is paused we add the records back to 
the completedFetches queue instead of draining
-                    // them so that they can be returned on a subsequent poll 
if the partition is resumed at that time
-                    log.debug("Skipping fetching records for assigned 
partition {} because it is paused", nextInLineFetch.partition);
-                    pausedCompletedFetches.add(nextInLineFetch);
-                    nextInLineFetch = null;
-                } else {
-                    Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, 
recordsRemaining);
-                    recordsRemaining -= nextFetch.numRecords();
-                    fetch.add(nextFetch);
-                }
-            }
-        } catch (KafkaException e) {
-            if (fetch.isEmpty())
-                throw e;
-        } finally {
-            // add any polled completed fetches for paused partitions back to 
the completed fetches queue to be
-            // re-evaluated in the next poll
-            completedFetches.addAll(pausedCompletedFetches);
-        }
-
-        return fetch;
-    }
-
-    private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int 
maxRecords) {
-        if (!subscriptions.isAssigned(completedFetch.partition)) {
-            // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it 
is no longer assigned",
-                    completedFetch.partition);
-        } else if (!subscriptions.isFetchable(completedFetch.partition)) {
-            // this can happen when a partition is paused before fetched 
records are returned to the consumer's
-            // poll call or if the offset is being reset
-            log.debug("Not returning fetched records for assigned partition {} 
since it is no longer fetchable",
-                    completedFetch.partition);
-        } else {
-            FetchPosition position = 
subscriptions.position(completedFetch.partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + completedFetch.partition);
-            }
-
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-                        partRecords.size(), position, 
completedFetch.partition);
-
-                boolean positionAdvanced = false;
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Updating fetch position from {} to {} for 
partition {} and returning {} records from `poll()`",
-                            position, nextPosition, completedFetch.partition, 
partRecords.size());
-                    subscriptions.position(completedFetch.partition, 
nextPosition);
-                    positionAdvanced = true;
-                }
-
-                Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    
this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag);
-
-                Long lead = 
subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    
this.metricsManager.recordPartitionLead(completedFetch.partition, lead);
-                }
-
-                return Fetch.forPartition(completedFetch.partition, 
partRecords, positionAdvanced);
-            } else {
-                // these records aren't next in line based on the last 
consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
-                        completedFetch.partition, 
completedFetch.nextFetchOffset, position);
-            }
-        }
-
-        log.trace("Draining fetched records for partition {}", 
completedFetch.partition);
-        completedFetch.drain();
-
-        return Fetch.empty();
-    }
-
-    private List<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> exclude = new HashSet<>();
-        if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
-            exclude.add(nextInLineFetch.partition);
-        }
-        for (CompletedFetch<K, V> completedFetch : completedFetches) {
-            exclude.add(completedFetch.partition);
-        }
-        return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
-    }
-
-    /**
-     * Determine which replica to read from.
-     */
-    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long 
currentTimeMs) {
-        Optional<Integer> nodeId = 
subscriptions.preferredReadReplica(partition, currentTimeMs);
-        if (nodeId.isPresent()) {
-            Optional<Node> node = nodeId.flatMap(id -> 
metadata.fetch().nodeIfOnline(partition, id));
-            if (node.isPresent()) {
-                return node.get();
-            } else {
-                log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
-                          " using the leader instead.", nodeId, partition);
-                // Note that this condition may happen due to stale metadata, 
so we clear preferred replica and
-                // refresh metadata.
-                requestMetadataUpdate(partition);
-                return leaderReplica;
-            }
-        } else {
-            return leaderReplica;
-        }
-    }
-
-    /**
-     * Create fetch requests for all nodes for which we have assigned 
partitions
-     * that have no existing requests in flight.
-     */
-    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
-        Map<Node, FetchSessionHandler.Builder> fetchable = new 
LinkedHashMap<>();
-        long currentTimeMs = time.milliseconds();
-        Map<String, Uuid> topicIds = metadata.topicIds();
-
-        for (TopicPartition partition : fetchablePartitions()) {
-            FetchPosition position = this.subscriptions.position(partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
-            }
-
-            Optional<Node> leaderOpt = position.currentLeader.leader;
-            if (!leaderOpt.isPresent()) {
-                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
-                metadata.requestUpdate();
-                continue;
-            }
-
-            // Use the preferred read replica if set, otherwise the 
partition's leader
-            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
-            if (client.isUnavailable(node)) {
-                client.maybeThrowAuthFailure(node);
-
-                // If we try to send during the reconnect backoff window, then 
the request is just
-                // going to be failed anyway before being sent, so skip the 
send for now
-                log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
-            } else if (this.nodesWithPendingFetchRequests.contains(node.id())) 
{
-                log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
-            } else {
-                // if there is a leader and no in-flight requests, issue a new 
fetch
-                FetchSessionHandler.Builder builder = fetchable.get(node);
-                if (builder == null) {
-                    int id = node.id();
-                    FetchSessionHandler handler = sessionHandler(id);
-                    if (handler == null) {
-                        handler = new FetchSessionHandler(logContext, id);
-                        sessionHandlers.put(id, handler);
-                    }
-                    builder = handler.newBuilder();
-                    fetchable.put(node, builder);
-                }
-                builder.add(partition, new FetchRequest.PartitionData(
-                    topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID),
-                    position.offset, FetchRequest.INVALID_LOG_START_OFFSET, 
this.fetchSize,
-                    position.currentLeader.epoch, Optional.empty()));
-
-                log.debug("Added {} fetch request for partition {} at position 
{} to node {}", isolationLevel,
-                    partition, position, node);
-            }
-        }
-
-        Map<Node, FetchSessionHandler.FetchRequestData> reqs = new 
LinkedHashMap<>();
-        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : 
fetchable.entrySet()) {
-            reqs.put(entry.getKey(), entry.getValue().build());
-        }
-        return reqs;
-    }
-
-    /**
-     * Initialize a CompletedFetch object.
-     */
-    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> 
nextCompletedFetch) {
-        TopicPartition tp = nextCompletedFetch.partition;
-        FetchResponseData.PartitionData partition = 
nextCompletedFetch.partitionData;
-        long fetchOffset = nextCompletedFetch.nextFetchOffset;
-        CompletedFetch<K, V> completedFetch = null;
-        Errors error = Errors.forCode(partition.errorCode());
-
-        try {
-            if (!subscriptions.hasValidPosition(tp)) {
-                // this can happen when a rebalance happened while fetch is 
still in-flight
-                log.debug("Ignoring fetched records for partition {} since it 
no longer has valid position", tp);
-            } else if (error == Errors.NONE) {
-                // we are interested in this fetch only if the beginning 
offset matches the
-                // current consumed position
-                FetchPosition position = subscriptions.position(tp);
-                if (position == null || position.offset != fetchOffset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since its offset {} does not match " +
-                            "the expected offset {}", tp, fetchOffset, 
position);
-                    return null;
-                }
-
-                log.trace("Preparing to read {} bytes of data for partition {} 
with offset {}",
-                        FetchResponse.recordsSize(partition), tp, position);
-                Iterator<? extends RecordBatch> batches = 
FetchResponse.recordsOrFail(partition).batches().iterator();
-                completedFetch = nextCompletedFetch;
-
-                if (!batches.hasNext() && FetchResponse.recordsSize(partition) 
> 0) {
-                    if (completedFetch.requestVersion < 3) {
-                        // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
-                        Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
-                        throw new RecordTooLargeException("There are some 
messages at [Partition=Offset]: " +
-                                recordTooLargePartitions + " whose size is 
larger than the fetch size " + this.fetchSize +
-                                " and hence cannot be returned. Please 
considering upgrading your broker to 0.10.1.0 or " +
-                                "newer to avoid this issue. Alternately, 
increase the fetch size on the client (using " +
-                                
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
-                                recordTooLargePartitions);
-                    } else {
-                        // This should not happen with brokers that support 
FetchRequest/Response V3 or higher (i.e. KIP-74)
-                        throw new KafkaException("Failed to make progress 
reading messages at " + tp + "=" +
-                            fetchOffset + ". Received a non-empty fetch 
response from the server, but no " +
-                            "complete records were found.");
-                    }
-                }
-
-                if (partition.highWatermark() >= 0) {
-                    log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark());
-                    subscriptions.updateHighWatermark(tp, 
partition.highWatermark());
-                }
-
-                if (partition.logStartOffset() >= 0) {
-                    log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset());
-                    subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset());
-                }
-
-                if (partition.lastStableOffset() >= 0) {
-                    log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset());
-                    subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
-                }
-
-                if (FetchResponse.isPreferredReplica(partition)) {
-                    
subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica(), () -> {
-                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
-                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
-                                tp, partition.preferredReadReplica(), 
expireTimeMs);
-                        return expireTimeMs;
-                    });
-                }
-
-                nextCompletedFetch.initialized = true;
-            } else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-                       error == Errors.REPLICA_NOT_AVAILABLE ||
-                       error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH ||
-                       error == Errors.OFFSET_NOT_AVAILABLE) {
-                log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_ID) {
-                log.warn("Received unknown topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
-                log.warn("Received inconsistent topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
-                if (!clearedReplicaId.isPresent()) {
-                    // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
-                    FetchPosition position = subscriptions.position(tp);
-                    if (position == null || fetchOffset != position.offset) {
-                        log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, 
fetchOffset, position);
-                    } else {
-                        handleOffsetOutOfRange(position, tp);
-                    }
-                } else {
-                    log.debug("Unset the preferred read replica {} for 
partition {} since we got {} when fetching {}",
-                            clearedReplicaId.get(), tp, error, fetchOffset);
-                }
-            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                //we log the actual partition and not just the topic to help 
with ACL propagation issues in large clusters
-                log.warn("Not authorized to read from partition {}.", tp);
-                throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
-            } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
-                log.debug("Received unknown leader epoch error in fetch for 
partition {}", tp);
-            } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
-                log.warn("Unknown server error while fetching offset {} for 
topic-partition {}",
-                        fetchOffset, tp);
-            } else if (error == Errors.CORRUPT_MESSAGE) {
-                throw new KafkaException("Encountered corrupt message when 
fetching offset "
-                        + fetchOffset
-                        + " for topic-partition "
-                        + tp);
-            } else {
-                throw new IllegalStateException("Unexpected error code "
-                        + error.code()
-                        + " while fetching at offset "
-                        + fetchOffset
-                        + " from topic-partition " + tp);
-            }
-        } finally {
-            if (completedFetch == null)
-                nextCompletedFetch.recordAggregatedMetrics(0, 0);
-
-            if (error != Errors.NONE)
-                // we move the partition to the end if there was an error. 
This way, it's more likely that partitions for
-                // the same topic can remain together (allowing for more 
efficient serialization).
-                subscriptions.movePartitionToEnd(tp);
-        }
-
-        return completedFetch;
-    }
-
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition, 
TopicPartition topicPartition) {
-        String errorMessage = "Fetch position " + fetchPosition + " is out of 
range for partition " + topicPartition;
-        if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("{}, resetting offset", errorMessage);
-            subscriptions.requestOffsetReset(topicPartition);
-        } else {
-            log.info("{}, raising error to the application since no reset 
policy is configured", errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage,
-                Collections.singletonMap(topicPartition, 
fetchPosition.offset));
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned 
partitions
-     *
-     * @param assignedPartitions  newly assigned {@link TopicPartition}
-     */
-    public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> 
assignedPartitions) {
-        Iterator<CompletedFetch<K, V>> completedFetchesItr = 
completedFetches.iterator();
-        while (completedFetchesItr.hasNext()) {
-            CompletedFetch<K, V> records = completedFetchesItr.next();
-            TopicPartition tp = records.partition;
-            if (!assignedPartitions.contains(tp)) {
-                records.drain();
-                completedFetchesItr.remove();
-            }
-        }
-
-        if (nextInLineFetch != null && 
!assignedPartitions.contains(nextInLineFetch.partition)) {
-            nextInLineFetch.drain();
-            nextInLineFetch = null;
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned topics
-     *
-     * @param assignedTopics  newly assigned topics
-     */
-    public void clearBufferedDataForUnassignedTopics(Collection<String> 
assignedTopics) {
-        Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
-            if (assignedTopics.contains(tp.topic())) {
-                currentTopicPartitions.add(tp);
-            }
-        }
-        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-    }
-
-    // Visible for testing
-    protected FetchSessionHandler sessionHandler(int node) {
-        return sessionHandlers.get(node);
-    }
-
-    public static Sensor throttleTimeSensor(Metrics metrics, 
FetchMetricsRegistry metricsRegistry) {
-        Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
-
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax),
 new Max());
-
-        return fetchThrottleTimeSensor;
-    }
-
-    // Visible for testing
-    void maybeCloseFetchSessions(final Timer timer) {
-        final Cluster cluster = metadata.fetch();
-        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
-        sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-            // set the session handler to notify close. This will set the next 
metadata request to send close message.
-            sessionHandler.notifyClose();
-
-            final int sessionId = sessionHandler.sessionId();
-            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
-            // skip sending the close request.
-            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
-            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
-                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
-                return;
-            }
-
-            final RequestFuture<ClientResponse> responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
-            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
-                @Override
-                public void onSuccess(ClientResponse value) {
-                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
-                }
-
-                @Override
-                public void onFailure(RuntimeException e) {
-                    log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
-                        "This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
-                }
-            });
-
-            requestFutures.add(responseFuture);
-        });
-
-        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
-        // all requests have received a response.
-        while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone)) {
-            client.poll(timer, null, true);
-        }
-
-        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
-            // we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
-            // here.
-            log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
-                "This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
-                "KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
-        }
+        return fetchRequestMap.size();
     }
 
     public void close(final Timer timer) {
@@ -782,25 +106,8 @@ public class Fetcher<K, V> implements Closeable {
 
         // Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
         // it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
-        synchronized (Fetcher.this) {
-            // we do not need to re-enable wakeups since we are closing already
-            client.disableWakeups();
-            if (nextInLineFetch != null)
-                nextInLineFetch.drain();
-            maybeCloseFetchSessions(timer);
-            Utils.closeQuietly(decompressionBufferSupplier, 
"decompressionBufferSupplier");
-            sessionHandlers.clear();
+        synchronized (this) {
+            super.close(timer);
         }
     }
-
-    @Override
-    public void close() {
-        close(time.timer(0));
-    }
-
-    private void requestMetadataUpdate(TopicPartition topicPartition) {
-        this.metadata.requestUpdate();
-        this.subscriptions.clearPreferredReadReplica(topicPartition);
-    }
-
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 98e6a85444d..b55c6822b0f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
@@ -28,6 +29,8 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.FetchConfig;
+import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
@@ -2656,24 +2659,26 @@ public class KafkaConsumerTest {
                 null);
         }
         IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
-        Fetcher<String, String> fetcher = new Fetcher<>(
-                loggerFactory,
-                consumerClient,
+        FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, 
metricsRegistry.fetcherMetrics);
+        FetchConfig<String, String> fetchConfig = new FetchConfig<>(
                 minBytes,
                 maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,
                 checkCrcs,
-                "",
+                CommonClientConfigs.DEFAULT_CLIENT_RACK,
                 keyDeserializer,
                 deserializer,
+                isolationLevel);
+        Fetcher<String, String> fetcher = new Fetcher<>(
+                loggerFactory,
+                consumerClient,
                 metadata,
                 subscription,
-                metrics,
-                metricsRegistry.fetcherMetrics,
-                time,
-                isolationLevel);
+                fetchConfig,
+                metricsManager,
+                time);
         OffsetFetcher offsetFetcher = new OffsetFetcher(loggerFactory,
                 consumerClient,
                 metadata,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index b420852852a..895d40b85fc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
@@ -41,8 +42,6 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
@@ -60,19 +59,6 @@ public class CompletedFetchTest {
     private final static long PRODUCER_ID = 1000L;
     private final static short PRODUCER_EPOCH = 0;
 
-    private BufferSupplier bufferSupplier;
-
-    @BeforeEach
-    public void setup() {
-        bufferSupplier = BufferSupplier.create();
-    }
-
-    @AfterEach
-    public void tearDown() {
-        if (bufferSupplier != null)
-            bufferSupplier.close();
-    }
-
     @Test
     public void testSimple() {
         long fetchOffset = 5;
@@ -235,13 +221,23 @@ public class CompletedFetchTest {
         FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), 
metricsRegistry);
         FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metrics, Collections.singleton(TP));
 
-        return new CompletedFetch<>(logContext,
-                subscriptions,
+        FetchConfig<K, V> fetchConfig = new FetchConfig<>(
+                ConsumerConfig.DEFAULT_FETCH_MIN_BYTES,
+                ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
+                ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS,
+                ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
+                ConsumerConfig.DEFAULT_MAX_POLL_RECORDS,
                 checkCrcs,
-                bufferSupplier,
+                ConsumerConfig.DEFAULT_CLIENT_RACK,
                 keyDeserializer,
                 valueDeserializer,
-                isolationLevel,
+                isolationLevel
+        );
+        return new CompletedFetch<>(
+                logContext,
+                subscriptions,
+                fetchConfig,
+                BufferSupplier.create(),
                 TP,
                 partitionData,
                 metricAggregator,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e60edbfb6c1..e4ddce247db 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -50,7 +51,6 @@ import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetFo
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -176,6 +176,7 @@ public class FetcherTest {
     private SubscriptionState subscriptions;
     private ConsumerMetadata metadata;
     private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
     private MockClient client;
     private Metrics metrics;
     private ApiVersions apiVersions = new ApiVersions();
@@ -1902,12 +1903,11 @@ public class FetcherTest {
         buildFetcher();
 
         MockSelector selector = new MockSelector(time);
-        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, 
metricsRegistry);
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
-                time, true, new ApiVersions(), throttleTimeSensor, new 
LogContext());
+                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext());
 
         ApiVersionsResponse apiVersionsResponse = 
ApiVersionsResponse.defaultApiVersionsResponse(
             400, ApiMessageType.ListenerType.ZK_BROKER);
@@ -2262,9 +2262,6 @@ public class FetcherTest {
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
         assertTrue(partitionRecords.containsKey(tp0));
 
-        // Create throttle metrics
-        Fetcher.throttleTimeSensor(metrics, metricsRegistry);
-
         // Verify that all metrics except metrics-count have registered 
templates
         Set<MetricNameTemplate> allMetrics = new HashSet<>();
         for (MetricName n : metrics.metrics().keySet()) {
@@ -2843,31 +2840,32 @@ public class FetcherTest {
                 isolationLevel,
                 apiVersions);
 
-        fetcher = new Fetcher<byte[], byte[]>(
-                new LogContext(),
-                consumerClient,
+        FetchConfig<byte[], byte[]> fetchConfig = new FetchConfig<>(
                 minBytes,
                 maxBytes,
                 maxWaitMs,
                 fetchSize,
                 2 * numPartitions,
-                true,
-                "",
+                true, // check crcs
+                CommonClientConfigs.DEFAULT_CLIENT_RACK,
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
+                isolationLevel);
+        fetcher = new Fetcher<byte[], byte[]>(
+                logContext,
+                consumerClient,
                 metadata,
                 subscriptions,
-                metrics,
-                metricsRegistry,
-                time,
-                isolationLevel) {
+                fetchConfig,
+                metricsManager,
+                time) {
             @Override
-            protected FetchSessionHandler sessionHandler(int id) {
-                final FetchSessionHandler handler = super.sessionHandler(id);
+            protected FetchSessionHandler sessionHandler(int node) {
+                final FetchSessionHandler handler = super.sessionHandler(node);
                 if (handler == null)
                     return null;
                 else {
-                    return new FetchSessionHandler(new LogContext(), id) {
+                    return new FetchSessionHandler(new LogContext(), node) {
                         @Override
                         public Builder newBuilder() {
                             verifySessionPartitions();
@@ -3645,24 +3643,25 @@ public class FetcherTest {
                                      SubscriptionState subscriptionState,
                                      LogContext logContext) {
         buildDependencies(metricConfig, metadataExpireMs, subscriptionState, 
logContext);
-        fetcher = spy(new Fetcher<>(
-                new LogContext(),
-                consumerClient,
+        FetchConfig<K, V> fetchConfig = new FetchConfig<>(
                 minBytes,
                 maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,
                 true, // check crc
-                "",
+                CommonClientConfigs.DEFAULT_CLIENT_RACK,
                 keyDeserializer,
                 valueDeserializer,
+                isolationLevel);
+        fetcher = spy(new Fetcher<>(
+                logContext,
+                consumerClient,
                 metadata,
-                subscriptions,
-                metrics,
-                metricsRegistry,
-                time,
-                isolationLevel));
+                subscriptionState,
+                fetchConfig,
+                metricsManager,
+                time));
         offsetFetcher = new OffsetFetcher(logContext,
                 consumerClient,
                 metadata,
@@ -3687,6 +3686,7 @@ public class FetcherTest {
         consumerClient = spy(new ConsumerNetworkClient(logContext, client, 
metadata, time,
                 100, 1000, Integer.MAX_VALUE));
         metricsRegistry = new 
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId);
+        metricsManager = new FetchMetricsManager(metrics, metricsRegistry);
     }
 
     private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> 
records) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index ffaeab4709d..24aa9c8ad1e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
@@ -1245,24 +1246,25 @@ public class OffsetFetcherTest {
         buildFetcher(metricConfig, isolationLevel, metadataExpireMs, 
subscriptionState, logContext);
 
         FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
-        Fetcher<byte[], byte[]> fetcher = new Fetcher<>(
-                logContext,
-                consumerClient,
+        FetchConfig<byte[], byte[]> fetchConfig = new FetchConfig<>(
                 minBytes,
                 maxBytes,
                 maxWaitMs,
                 fetchSize,
                 maxPollRecords,
                 true, // check crc
-                "",
+                CommonClientConfigs.DEFAULT_CLIENT_RACK,
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
+                isolationLevel);
+        Fetcher<byte[], byte[]> fetcher = new Fetcher<>(
+                logContext,
+                consumerClient,
                 metadata,
                 subscriptions,
-                metrics,
-                metricsRegistry,
-                time,
-                isolationLevel);
+                fetchConfig,
+                new FetchMetricsManager(metrics, metricsRegistry),
+                time);
 
         assignFromUser(singleton(tp0));
 

Reply via email to