guozhangwang commented on code in PR #13425:
URL: https://github.com/apache/kafka/pull/13425#discussion_r1146949108


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -154,7 +155,8 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>fetch.min.bytes</code>
      */
     public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
-    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of 1 byte means that fetch requests 
are answered as soon as a single byte of data is available or the fetch request 
times out waiting for data to arrive. Setting this to something greater than 1 
will cause the server to wait for larger amounts of data to accumulate which 
can improve server throughput a bit at the cost of some additional latency.";
+    public static final int DEFAULT_FETCH_MIN_BYTES = 1;
+    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of 
data the server should return for a fetch request. If insufficient data is 
available the request will wait for that much data to accumulate before 
answering the request. The default setting of " + DEFAULT_FETCH_MIN_BYTES + " 
byte means that fetch requests are answered as soon as that many byte(s) of 
data is available or the fetch request times out waiting for data to arrive. 
Setting this to a larger value will cause the server to wait for larger amounts 
of data to accumulate which can improve server throughput a bit at the cost of 
some additional latency.";

Review Comment:
   Assuming there's no content change here, but just github not smart enough 
with diffs :) LMK otherwise.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -172,606 +69,41 @@ public boolean hasAvailableFetches() {
      * @return number of fetches sent
      */
     public synchronized int sendFetches() {
-        // Update metrics in case there was an assignment change
-        metricsManager.maybeUpdateAssignment(subscriptions);
-
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
+
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
             final FetchSessionHandler.FetchRequestData data = entry.getValue();
-            final RequestFuture<ClientResponse> future = 
sendFetchRequestToNode(data, fetchTarget);
-            // We add the node to the set of nodes with pending fetch requests 
before adding the
-            // listener because the future may have been fulfilled on another 
thread (e.g. during a
-            // disconnection being handled by the heartbeat thread) which will 
mean the listener
-            // will be invoked synchronously.
-            this.nodesWithPendingFetchRequests.add(entry.getKey().id());
-            future.addListener(new RequestFutureListener<ClientResponse>() {
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+            RequestFutureListener<ClientResponse> listener = new 
RequestFutureListener<ClientResponse>() {
                 @Override
                 public void onSuccess(ClientResponse resp) {
-                    synchronized (Fetcher.this) {
-                        try {
-                            FetchResponse response = (FetchResponse) 
resp.responseBody();
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler == null) {
-                                log.error("Unable to find FetchSessionHandler 
for node {}. Ignoring fetch response.",
-                                        fetchTarget.id());
-                                return;
-                            }
-                            if (!handler.handleResponse(response, 
resp.requestHeader().apiVersion())) {
-                                if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
-                                    metadata.requestUpdate();
-                                }
-                                return;
-                            }
-
-                            Map<TopicPartition, 
FetchResponseData.PartitionData> responseData = 
response.responseData(handler.sessionTopicNames(), 
resp.requestHeader().apiVersion());
-                            Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
-                            FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
-
-                            for (Map.Entry<TopicPartition, 
FetchResponseData.PartitionData> entry : responseData.entrySet()) {
-                                TopicPartition partition = entry.getKey();
-                                FetchRequest.PartitionData requestData = 
data.sessionPartitions().get(partition);
-                                if (requestData == null) {
-                                    String message;
-                                    if (data.metadata().isFull()) {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing full 
request partition: partition={}; metadata={}",
-                                                new Object[]{partition, 
data.metadata()}).getMessage();
-                                    } else {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing session 
request partition: partition={}; metadata={}; toSend={}; toForget={}; 
toReplace={}",
-                                                new Object[]{partition, 
data.metadata(), data.toSend(), data.toForget(), 
data.toReplace()}).getMessage();
-                                    }
-
-                                    // Received fetch response for missing 
session partition
-                                    throw new IllegalStateException(message);
-                                } else {
-                                    long fetchOffset = requestData.fetchOffset;
-                                    short requestVersion = 
resp.requestHeader().apiVersion();
-                                    FetchResponseData.PartitionData 
partitionData = entry.getValue();
-
-                                    log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
-                                            isolationLevel, fetchOffset, 
partition, partitionData);
-
-                                    CompletedFetch<K, V> completedFetch = new 
CompletedFetch<>(logContext,
-                                            subscriptions,
-                                            checkCrcs,
-                                            decompressionBufferSupplier,
-                                            keyDeserializer,
-                                            valueDeserializer,
-                                            isolationLevel,
-                                            partition,
-                                            partitionData,
-                                            metricAggregator,
-                                            fetchOffset,
-                                            requestVersion);
-                                    completedFetches.add(completedFetch);
-                                }
-                            }
-
-                            
metricsManager.recordLatency(resp.requestLatencyMs());
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
-                    }
+                    handleFetchResponse(fetchTarget, data, resp);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    synchronized (Fetcher.this) {
-                        try {
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler != null) {
-                                handler.handleError(e);
-                                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
-                            }
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
-                    }
-                }
-            });
-
-        }
-        return fetchRequestMap.size();
-    }
-
-    /**
-     * Send Fetch Request to Kafka cluster asynchronously.
-     *
-     * </p>
-     *
-     * This method is visible for testing.
-     *
-     * @return A future that indicates result of sent Fetch request
-     */
-    private RequestFuture<ClientResponse> sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
-                                                                 final Node 
fetchTarget) {
-        // Version 12 is the maximum version that could be used without topic 
IDs. See FetchRequest.json for schema
-        // changelog.
-        final short maxVersion = requestData.canUseTopicIds() ? 
ApiKeys.FETCH.latestVersion() : (short) 12;
-
-        final FetchRequest.Builder request = FetchRequest.Builder
-                .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
requestData.toSend())
-                .isolationLevel(isolationLevel)
-                .setMaxBytes(this.maxBytes)
-                .metadata(requestData.metadata())
-                .removed(requestData.toForget())
-                .replaced(requestData.toReplace())
-                .rackId(clientRackId);
-
-        log.debug("Sending {} {} to broker {}", isolationLevel, requestData, 
fetchTarget);
-
-        return client.send(fetchTarget, request);
-    }
-
-    /**
-     * Return the fetched records, empty the record buffer and update the 
consumed position.
-     *
-     * </p>
-     *
-     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.
-     *
-     * @return A {@link Fetch} for the requested partitions
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
-     */
-    public Fetch<K, V> collectFetch() {
-        Fetch<K, V> fetch = Fetch.empty();
-        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new 
ArrayDeque<>();
-        int recordsRemaining = maxPollRecords;
-
-        try {
-            while (recordsRemaining > 0) {
-                if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
-                    CompletedFetch<K, V> records = completedFetches.peek();
-                    if (records == null) break;
-
-                    if (!records.initialized) {
-                        try {
-                            nextInLineFetch = 
initializeCompletedFetch(records);
-                        } catch (Exception e) {
-                            // Remove a completedFetch upon a parse with 
exception if (1) it contains no records, and
-                            // (2) there are no fetched records with actual 
content preceding this exception.
-                            // The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
-                            // in cases such as the 
TopicAuthorizationException, and the second condition ensures that no
-                            // potential data loss due to an exception in a 
following record.
-                            FetchResponseData.PartitionData partition = 
records.partitionData;
-                            if (fetch.isEmpty() && 
FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) {
-                                completedFetches.poll();
-                            }
-                            throw e;
-                        }
-                    } else {
-                        nextInLineFetch = records;
-                    }
-                    completedFetches.poll();
-                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
-                    // when the partition is paused we add the records back to 
the completedFetches queue instead of draining
-                    // them so that they can be returned on a subsequent poll 
if the partition is resumed at that time
-                    log.debug("Skipping fetching records for assigned 
partition {} because it is paused", nextInLineFetch.partition);
-                    pausedCompletedFetches.add(nextInLineFetch);
-                    nextInLineFetch = null;
-                } else {
-                    Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, 
recordsRemaining);
-                    recordsRemaining -= nextFetch.numRecords();
-                    fetch.add(nextFetch);
-                }
-            }
-        } catch (KafkaException e) {
-            if (fetch.isEmpty())
-                throw e;
-        } finally {
-            // add any polled completed fetches for paused partitions back to 
the completed fetches queue to be
-            // re-evaluated in the next poll
-            completedFetches.addAll(pausedCompletedFetches);
-        }
-
-        return fetch;
-    }
-
-    private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int 
maxRecords) {
-        if (!subscriptions.isAssigned(completedFetch.partition)) {
-            // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it 
is no longer assigned",
-                    completedFetch.partition);
-        } else if (!subscriptions.isFetchable(completedFetch.partition)) {
-            // this can happen when a partition is paused before fetched 
records are returned to the consumer's
-            // poll call or if the offset is being reset
-            log.debug("Not returning fetched records for assigned partition {} 
since it is no longer fetchable",
-                    completedFetch.partition);
-        } else {
-            FetchPosition position = 
subscriptions.position(completedFetch.partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + completedFetch.partition);
-            }
-
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-                        partRecords.size(), position, 
completedFetch.partition);
-
-                boolean positionAdvanced = false;
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Updating fetch position from {} to {} for 
partition {} and returning {} records from `poll()`",
-                            position, nextPosition, completedFetch.partition, 
partRecords.size());
-                    subscriptions.position(completedFetch.partition, 
nextPosition);
-                    positionAdvanced = true;
-                }
-
-                Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    
this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag);
-
-                Long lead = 
subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    
this.metricsManager.recordPartitionLead(completedFetch.partition, lead);
-                }
-
-                return Fetch.forPartition(completedFetch.partition, 
partRecords, positionAdvanced);
-            } else {
-                // these records aren't next in line based on the last 
consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
-                        completedFetch.partition, 
completedFetch.nextFetchOffset, position);
-            }
-        }
-
-        log.trace("Draining fetched records for partition {}", 
completedFetch.partition);
-        completedFetch.drain();
-
-        return Fetch.empty();
-    }
-
-    private List<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> exclude = new HashSet<>();
-        if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
-            exclude.add(nextInLineFetch.partition);
-        }
-        for (CompletedFetch<K, V> completedFetch : completedFetches) {
-            exclude.add(completedFetch.partition);
-        }
-        return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
-    }
-
-    /**
-     * Determine which replica to read from.
-     */
-    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long 
currentTimeMs) {
-        Optional<Integer> nodeId = 
subscriptions.preferredReadReplica(partition, currentTimeMs);
-        if (nodeId.isPresent()) {
-            Optional<Node> node = nodeId.flatMap(id -> 
metadata.fetch().nodeIfOnline(partition, id));
-            if (node.isPresent()) {
-                return node.get();
-            } else {
-                log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
-                          " using the leader instead.", nodeId, partition);
-                // Note that this condition may happen due to stale metadata, 
so we clear preferred replica and
-                // refresh metadata.
-                requestMetadataUpdate(partition);
-                return leaderReplica;
-            }
-        } else {
-            return leaderReplica;
-        }
-    }
-
-    /**
-     * Create fetch requests for all nodes for which we have assigned 
partitions
-     * that have no existing requests in flight.
-     */
-    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
-        Map<Node, FetchSessionHandler.Builder> fetchable = new 
LinkedHashMap<>();
-        long currentTimeMs = time.milliseconds();
-        Map<String, Uuid> topicIds = metadata.topicIds();
-
-        for (TopicPartition partition : fetchablePartitions()) {
-            FetchPosition position = this.subscriptions.position(partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
-            }
-
-            Optional<Node> leaderOpt = position.currentLeader.leader;
-            if (!leaderOpt.isPresent()) {
-                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
-                metadata.requestUpdate();
-                continue;
-            }
-
-            // Use the preferred read replica if set, otherwise the 
partition's leader
-            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
-            if (client.isUnavailable(node)) {
-                client.maybeThrowAuthFailure(node);
-
-                // If we try to send during the reconnect backoff window, then 
the request is just
-                // going to be failed anyway before being sent, so skip the 
send for now
-                log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
-            } else if (this.nodesWithPendingFetchRequests.contains(node.id())) 
{
-                log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
-            } else {
-                // if there is a leader and no in-flight requests, issue a new 
fetch
-                FetchSessionHandler.Builder builder = fetchable.get(node);
-                if (builder == null) {
-                    int id = node.id();
-                    FetchSessionHandler handler = sessionHandler(id);
-                    if (handler == null) {
-                        handler = new FetchSessionHandler(logContext, id);
-                        sessionHandlers.put(id, handler);
-                    }
-                    builder = handler.newBuilder();
-                    fetchable.put(node, builder);
-                }
-                builder.add(partition, new FetchRequest.PartitionData(
-                    topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID),
-                    position.offset, FetchRequest.INVALID_LOG_START_OFFSET, 
this.fetchSize,
-                    position.currentLeader.epoch, Optional.empty()));
-
-                log.debug("Added {} fetch request for partition {} at position 
{} to node {}", isolationLevel,
-                    partition, position, node);
-            }
-        }
-
-        Map<Node, FetchSessionHandler.FetchRequestData> reqs = new 
LinkedHashMap<>();
-        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : 
fetchable.entrySet()) {
-            reqs.put(entry.getKey(), entry.getValue().build());
-        }
-        return reqs;
-    }
-
-    /**
-     * Initialize a CompletedFetch object.
-     */
-    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> 
nextCompletedFetch) {
-        TopicPartition tp = nextCompletedFetch.partition;
-        FetchResponseData.PartitionData partition = 
nextCompletedFetch.partitionData;
-        long fetchOffset = nextCompletedFetch.nextFetchOffset;
-        CompletedFetch<K, V> completedFetch = null;
-        Errors error = Errors.forCode(partition.errorCode());
-
-        try {
-            if (!subscriptions.hasValidPosition(tp)) {
-                // this can happen when a rebalance happened while fetch is 
still in-flight
-                log.debug("Ignoring fetched records for partition {} since it 
no longer has valid position", tp);
-            } else if (error == Errors.NONE) {
-                // we are interested in this fetch only if the beginning 
offset matches the
-                // current consumed position
-                FetchPosition position = subscriptions.position(tp);
-                if (position == null || position.offset != fetchOffset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since its offset {} does not match " +
-                            "the expected offset {}", tp, fetchOffset, 
position);
-                    return null;
-                }
-
-                log.trace("Preparing to read {} bytes of data for partition {} 
with offset {}",
-                        FetchResponse.recordsSize(partition), tp, position);
-                Iterator<? extends RecordBatch> batches = 
FetchResponse.recordsOrFail(partition).batches().iterator();
-                completedFetch = nextCompletedFetch;
-
-                if (!batches.hasNext() && FetchResponse.recordsSize(partition) 
> 0) {
-                    if (completedFetch.requestVersion < 3) {
-                        // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
-                        Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
-                        throw new RecordTooLargeException("There are some 
messages at [Partition=Offset]: " +
-                                recordTooLargePartitions + " whose size is 
larger than the fetch size " + this.fetchSize +
-                                " and hence cannot be returned. Please 
considering upgrading your broker to 0.10.1.0 or " +
-                                "newer to avoid this issue. Alternately, 
increase the fetch size on the client (using " +
-                                
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
-                                recordTooLargePartitions);
-                    } else {
-                        // This should not happen with brokers that support 
FetchRequest/Response V3 or higher (i.e. KIP-74)
-                        throw new KafkaException("Failed to make progress 
reading messages at " + tp + "=" +
-                            fetchOffset + ". Received a non-empty fetch 
response from the server, but no " +
-                            "complete records were found.");
-                    }
-                }
-
-                if (partition.highWatermark() >= 0) {
-                    log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark());
-                    subscriptions.updateHighWatermark(tp, 
partition.highWatermark());
+                    handleFetchResponse(fetchTarget, e);
                 }
+            };
 
-                if (partition.logStartOffset() >= 0) {
-                    log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset());
-                    subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset());
-                }
-
-                if (partition.lastStableOffset() >= 0) {
-                    log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset());
-                    subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
-                }
-
-                if (FetchResponse.isPreferredReplica(partition)) {
-                    
subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica(), () -> {
-                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
-                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
-                                tp, partition.preferredReadReplica(), 
expireTimeMs);
-                        return expireTimeMs;
-                    });
-                }
-
-                nextCompletedFetch.initialized = true;
-            } else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-                       error == Errors.REPLICA_NOT_AVAILABLE ||
-                       error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH ||
-                       error == Errors.OFFSET_NOT_AVAILABLE) {
-                log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_ID) {
-                log.warn("Received unknown topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
-                log.warn("Received inconsistent topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
-                if (!clearedReplicaId.isPresent()) {
-                    // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
-                    FetchPosition position = subscriptions.position(tp);
-                    if (position == null || fetchOffset != position.offset) {
-                        log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, 
fetchOffset, position);
-                    } else {
-                        handleOffsetOutOfRange(position, tp);
-                    }
-                } else {
-                    log.debug("Unset the preferred read replica {} for 
partition {} since we got {} when fetching {}",
-                            clearedReplicaId.get(), tp, error, fetchOffset);
-                }
-            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                //we log the actual partition and not just the topic to help 
with ACL propagation issues in large clusters
-                log.warn("Not authorized to read from partition {}.", tp);
-                throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
-            } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
-                log.debug("Received unknown leader epoch error in fetch for 
partition {}", tp);
-            } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
-                log.warn("Unknown server error while fetching offset {} for 
topic-partition {}",
-                        fetchOffset, tp);
-            } else if (error == Errors.CORRUPT_MESSAGE) {
-                throw new KafkaException("Encountered corrupt message when 
fetching offset "
-                        + fetchOffset
-                        + " for topic-partition "
-                        + tp);
-            } else {
-                throw new IllegalStateException("Unexpected error code "
-                        + error.code()
-                        + " while fetching at offset "
-                        + fetchOffset
-                        + " from topic-partition " + tp);
-            }
-        } finally {
-            if (completedFetch == null)
-                nextCompletedFetch.recordAggregatedMetrics(0, 0);
-
-            if (error != Errors.NONE)
-                // we move the partition to the end if there was an error. 
This way, it's more likely that partitions for
-                // the same topic can remain together (allowing for more 
efficient serialization).
-                subscriptions.movePartitionToEnd(tp);
-        }
-
-        return completedFetch;
-    }
-
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition, 
TopicPartition topicPartition) {
-        String errorMessage = "Fetch position " + fetchPosition + " is out of 
range for partition " + topicPartition;
-        if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("{}, resetting offset", errorMessage);
-            subscriptions.requestOffsetReset(topicPartition);
-        } else {
-            log.info("{}, raising error to the application since no reset 
policy is configured", errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage,
-                Collections.singletonMap(topicPartition, 
fetchPosition.offset));
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned 
partitions
-     *
-     * @param assignedPartitions  newly assigned {@link TopicPartition}
-     */
-    public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> 
assignedPartitions) {
-        Iterator<CompletedFetch<K, V>> completedFetchesItr = 
completedFetches.iterator();
-        while (completedFetchesItr.hasNext()) {
-            CompletedFetch<K, V> records = completedFetchesItr.next();
-            TopicPartition tp = records.partition;
-            if (!assignedPartitions.contains(tp)) {
-                records.drain();
-                completedFetchesItr.remove();
-            }
-        }
-
-        if (nextInLineFetch != null && 
!assignedPartitions.contains(nextInLineFetch.partition)) {
-            nextInLineFetch.drain();
-            nextInLineFetch = null;
+            final RequestFuture<ClientResponse> future = 
client.send(fetchTarget, request);
+            future.addListener(listener);
         }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned topics
-     *
-     * @param assignedTopics  newly assigned topics
-     */
-    public void clearBufferedDataForUnassignedTopics(Collection<String> 
assignedTopics) {
-        Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
-            if (assignedTopics.contains(tp.topic())) {
-                currentTopicPartitions.add(tp);
-            }
-        }
-        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-    }
 
-    // Visible for testing
-    protected FetchSessionHandler sessionHandler(int node) {
-        return sessionHandlers.get(node);
+        return fetchRequestMap.size();
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics, 
FetchMetricsRegistry metricsRegistry) {
-        Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
-
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax),
 new Max());
-
-        return fetchThrottleTimeSensor;
+    @Override
+    protected synchronized void handleFetchResponse(Node fetchTarget,

Review Comment:
   Ditto below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param <K> Type for the message key
+ * @param <V> Type for the message value
+ */
+public abstract class AbstractFetch<K, V> implements Closeable {

Review Comment:
   Do we expect the AbstractFetch to have some abstract functions that only the 
new Fetcher would impl? If not maybe we do not need to declare it as abstract?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -797,24 +799,15 @@ public KafkaConsumer(Map<String, Object> configs,
                         
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
                         config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
             }
+            FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, 
keyDeserializer, valueDeserializer, isolationLevel);

Review Comment:
   This is a nice improvement, +1!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@code AbstractFetch} represents the basic state and logic for record 
fetching processing.
+ * @param <K> Type for the message key
+ * @param <V> Type for the message value
+ */
+public abstract class AbstractFetch<K, V> implements Closeable {
+
+    private final Logger log;
+    protected final LogContext logContext;
+    protected final ConsumerNetworkClient client;
+    protected final ConsumerMetadata metadata;
+    protected final SubscriptionState subscriptions;
+    protected final FetchConfig<K, V> fetchConfig;
+    protected final Time time;
+    protected final FetchMetricsManager metricsManager;
+
+    private final BufferSupplier decompressionBufferSupplier;
+    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
+    private final Map<Integer, FetchSessionHandler> sessionHandlers;
+    private final Set<Integer> nodesWithPendingFetchRequests;
+
+    private CompletedFetch<K, V> nextInLineFetch;
+
+    public AbstractFetch(final LogContext logContext,
+                         final ConsumerNetworkClient client,
+                         final ConsumerMetadata metadata,
+                         final SubscriptionState subscriptions,
+                         final FetchConfig<K, V> fetchConfig,
+                         final FetchMetricsManager metricsManager,
+                         final Time time) {
+        this.log = logContext.logger(AbstractFetch.class);
+        this.logContext = logContext;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.decompressionBufferSupplier = BufferSupplier.create();
+        this.completedFetches = new ConcurrentLinkedQueue<>();
+        this.sessionHandlers = new HashMap<>();
+        this.nodesWithPendingFetchRequests = new HashSet<>();
+        this.metricsManager = metricsManager;
+        this.time = time;
+    }
+
+    /**
+     * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+     * visibility for testing.
+     *
+     * @return true if there are completed fetches, false otherwise
+     */
+    boolean hasCompletedFetches() {

Review Comment:
   I made a quick browse on this class assuming that all the logic are not 
changed, i.e. mostly copy-paste here except the FetchConfig wrapping. Please 
LMK otherwise and I will give it a deeper look.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -172,606 +69,41 @@ public boolean hasAvailableFetches() {
      * @return number of fetches sent
      */
     public synchronized int sendFetches() {
-        // Update metrics in case there was an assignment change
-        metricsManager.maybeUpdateAssignment(subscriptions);
-
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
+
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
             final Node fetchTarget = entry.getKey();
             final FetchSessionHandler.FetchRequestData data = entry.getValue();
-            final RequestFuture<ClientResponse> future = 
sendFetchRequestToNode(data, fetchTarget);
-            // We add the node to the set of nodes with pending fetch requests 
before adding the
-            // listener because the future may have been fulfilled on another 
thread (e.g. during a
-            // disconnection being handled by the heartbeat thread) which will 
mean the listener
-            // will be invoked synchronously.
-            this.nodesWithPendingFetchRequests.add(entry.getKey().id());
-            future.addListener(new RequestFutureListener<ClientResponse>() {
+            final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+            RequestFutureListener<ClientResponse> listener = new 
RequestFutureListener<ClientResponse>() {
                 @Override
                 public void onSuccess(ClientResponse resp) {
-                    synchronized (Fetcher.this) {
-                        try {
-                            FetchResponse response = (FetchResponse) 
resp.responseBody();
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler == null) {
-                                log.error("Unable to find FetchSessionHandler 
for node {}. Ignoring fetch response.",
-                                        fetchTarget.id());
-                                return;
-                            }
-                            if (!handler.handleResponse(response, 
resp.requestHeader().apiVersion())) {
-                                if (response.error() == 
Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
-                                    metadata.requestUpdate();
-                                }
-                                return;
-                            }
-
-                            Map<TopicPartition, 
FetchResponseData.PartitionData> responseData = 
response.responseData(handler.sessionTopicNames(), 
resp.requestHeader().apiVersion());
-                            Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
-                            FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
-
-                            for (Map.Entry<TopicPartition, 
FetchResponseData.PartitionData> entry : responseData.entrySet()) {
-                                TopicPartition partition = entry.getKey();
-                                FetchRequest.PartitionData requestData = 
data.sessionPartitions().get(partition);
-                                if (requestData == null) {
-                                    String message;
-                                    if (data.metadata().isFull()) {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing full 
request partition: partition={}; metadata={}",
-                                                new Object[]{partition, 
data.metadata()}).getMessage();
-                                    } else {
-                                        message = MessageFormatter.arrayFormat(
-                                                "Response for missing session 
request partition: partition={}; metadata={}; toSend={}; toForget={}; 
toReplace={}",
-                                                new Object[]{partition, 
data.metadata(), data.toSend(), data.toForget(), 
data.toReplace()}).getMessage();
-                                    }
-
-                                    // Received fetch response for missing 
session partition
-                                    throw new IllegalStateException(message);
-                                } else {
-                                    long fetchOffset = requestData.fetchOffset;
-                                    short requestVersion = 
resp.requestHeader().apiVersion();
-                                    FetchResponseData.PartitionData 
partitionData = entry.getValue();
-
-                                    log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
-                                            isolationLevel, fetchOffset, 
partition, partitionData);
-
-                                    CompletedFetch<K, V> completedFetch = new 
CompletedFetch<>(logContext,
-                                            subscriptions,
-                                            checkCrcs,
-                                            decompressionBufferSupplier,
-                                            keyDeserializer,
-                                            valueDeserializer,
-                                            isolationLevel,
-                                            partition,
-                                            partitionData,
-                                            metricAggregator,
-                                            fetchOffset,
-                                            requestVersion);
-                                    completedFetches.add(completedFetch);
-                                }
-                            }
-
-                            
metricsManager.recordLatency(resp.requestLatencyMs());
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
-                    }
+                    handleFetchResponse(fetchTarget, data, resp);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    synchronized (Fetcher.this) {
-                        try {
-                            FetchSessionHandler handler = 
sessionHandler(fetchTarget.id());
-                            if (handler != null) {
-                                handler.handleError(e);
-                                
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
-                            }
-                        } finally {
-                            
nodesWithPendingFetchRequests.remove(fetchTarget.id());
-                        }
-                    }
-                }
-            });
-
-        }
-        return fetchRequestMap.size();
-    }
-
-    /**
-     * Send Fetch Request to Kafka cluster asynchronously.
-     *
-     * </p>
-     *
-     * This method is visible for testing.
-     *
-     * @return A future that indicates result of sent Fetch request
-     */
-    private RequestFuture<ClientResponse> sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
-                                                                 final Node 
fetchTarget) {
-        // Version 12 is the maximum version that could be used without topic 
IDs. See FetchRequest.json for schema
-        // changelog.
-        final short maxVersion = requestData.canUseTopicIds() ? 
ApiKeys.FETCH.latestVersion() : (short) 12;
-
-        final FetchRequest.Builder request = FetchRequest.Builder
-                .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
requestData.toSend())
-                .isolationLevel(isolationLevel)
-                .setMaxBytes(this.maxBytes)
-                .metadata(requestData.metadata())
-                .removed(requestData.toForget())
-                .replaced(requestData.toReplace())
-                .rackId(clientRackId);
-
-        log.debug("Sending {} {} to broker {}", isolationLevel, requestData, 
fetchTarget);
-
-        return client.send(fetchTarget, request);
-    }
-
-    /**
-     * Return the fetched records, empty the record buffer and update the 
consumed position.
-     *
-     * </p>
-     *
-     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.
-     *
-     * @return A {@link Fetch} for the requested partitions
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
-     */
-    public Fetch<K, V> collectFetch() {
-        Fetch<K, V> fetch = Fetch.empty();
-        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new 
ArrayDeque<>();
-        int recordsRemaining = maxPollRecords;
-
-        try {
-            while (recordsRemaining > 0) {
-                if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
-                    CompletedFetch<K, V> records = completedFetches.peek();
-                    if (records == null) break;
-
-                    if (!records.initialized) {
-                        try {
-                            nextInLineFetch = 
initializeCompletedFetch(records);
-                        } catch (Exception e) {
-                            // Remove a completedFetch upon a parse with 
exception if (1) it contains no records, and
-                            // (2) there are no fetched records with actual 
content preceding this exception.
-                            // The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
-                            // in cases such as the 
TopicAuthorizationException, and the second condition ensures that no
-                            // potential data loss due to an exception in a 
following record.
-                            FetchResponseData.PartitionData partition = 
records.partitionData;
-                            if (fetch.isEmpty() && 
FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) {
-                                completedFetches.poll();
-                            }
-                            throw e;
-                        }
-                    } else {
-                        nextInLineFetch = records;
-                    }
-                    completedFetches.poll();
-                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
-                    // when the partition is paused we add the records back to 
the completedFetches queue instead of draining
-                    // them so that they can be returned on a subsequent poll 
if the partition is resumed at that time
-                    log.debug("Skipping fetching records for assigned 
partition {} because it is paused", nextInLineFetch.partition);
-                    pausedCompletedFetches.add(nextInLineFetch);
-                    nextInLineFetch = null;
-                } else {
-                    Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, 
recordsRemaining);
-                    recordsRemaining -= nextFetch.numRecords();
-                    fetch.add(nextFetch);
-                }
-            }
-        } catch (KafkaException e) {
-            if (fetch.isEmpty())
-                throw e;
-        } finally {
-            // add any polled completed fetches for paused partitions back to 
the completed fetches queue to be
-            // re-evaluated in the next poll
-            completedFetches.addAll(pausedCompletedFetches);
-        }
-
-        return fetch;
-    }
-
-    private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int 
maxRecords) {
-        if (!subscriptions.isAssigned(completedFetch.partition)) {
-            // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it 
is no longer assigned",
-                    completedFetch.partition);
-        } else if (!subscriptions.isFetchable(completedFetch.partition)) {
-            // this can happen when a partition is paused before fetched 
records are returned to the consumer's
-            // poll call or if the offset is being reset
-            log.debug("Not returning fetched records for assigned partition {} 
since it is no longer fetchable",
-                    completedFetch.partition);
-        } else {
-            FetchPosition position = 
subscriptions.position(completedFetch.partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + completedFetch.partition);
-            }
-
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-                        partRecords.size(), position, 
completedFetch.partition);
-
-                boolean positionAdvanced = false;
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Updating fetch position from {} to {} for 
partition {} and returning {} records from `poll()`",
-                            position, nextPosition, completedFetch.partition, 
partRecords.size());
-                    subscriptions.position(completedFetch.partition, 
nextPosition);
-                    positionAdvanced = true;
-                }
-
-                Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    
this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag);
-
-                Long lead = 
subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    
this.metricsManager.recordPartitionLead(completedFetch.partition, lead);
-                }
-
-                return Fetch.forPartition(completedFetch.partition, 
partRecords, positionAdvanced);
-            } else {
-                // these records aren't next in line based on the last 
consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
-                        completedFetch.partition, 
completedFetch.nextFetchOffset, position);
-            }
-        }
-
-        log.trace("Draining fetched records for partition {}", 
completedFetch.partition);
-        completedFetch.drain();
-
-        return Fetch.empty();
-    }
-
-    private List<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> exclude = new HashSet<>();
-        if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
-            exclude.add(nextInLineFetch.partition);
-        }
-        for (CompletedFetch<K, V> completedFetch : completedFetches) {
-            exclude.add(completedFetch.partition);
-        }
-        return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
-    }
-
-    /**
-     * Determine which replica to read from.
-     */
-    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long 
currentTimeMs) {
-        Optional<Integer> nodeId = 
subscriptions.preferredReadReplica(partition, currentTimeMs);
-        if (nodeId.isPresent()) {
-            Optional<Node> node = nodeId.flatMap(id -> 
metadata.fetch().nodeIfOnline(partition, id));
-            if (node.isPresent()) {
-                return node.get();
-            } else {
-                log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
-                          " using the leader instead.", nodeId, partition);
-                // Note that this condition may happen due to stale metadata, 
so we clear preferred replica and
-                // refresh metadata.
-                requestMetadataUpdate(partition);
-                return leaderReplica;
-            }
-        } else {
-            return leaderReplica;
-        }
-    }
-
-    /**
-     * Create fetch requests for all nodes for which we have assigned 
partitions
-     * that have no existing requests in flight.
-     */
-    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
-        Map<Node, FetchSessionHandler.Builder> fetchable = new 
LinkedHashMap<>();
-        long currentTimeMs = time.milliseconds();
-        Map<String, Uuid> topicIds = metadata.topicIds();
-
-        for (TopicPartition partition : fetchablePartitions()) {
-            FetchPosition position = this.subscriptions.position(partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
-            }
-
-            Optional<Node> leaderOpt = position.currentLeader.leader;
-            if (!leaderOpt.isPresent()) {
-                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
-                metadata.requestUpdate();
-                continue;
-            }
-
-            // Use the preferred read replica if set, otherwise the 
partition's leader
-            Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
-            if (client.isUnavailable(node)) {
-                client.maybeThrowAuthFailure(node);
-
-                // If we try to send during the reconnect backoff window, then 
the request is just
-                // going to be failed anyway before being sent, so skip the 
send for now
-                log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
-            } else if (this.nodesWithPendingFetchRequests.contains(node.id())) 
{
-                log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
-            } else {
-                // if there is a leader and no in-flight requests, issue a new 
fetch
-                FetchSessionHandler.Builder builder = fetchable.get(node);
-                if (builder == null) {
-                    int id = node.id();
-                    FetchSessionHandler handler = sessionHandler(id);
-                    if (handler == null) {
-                        handler = new FetchSessionHandler(logContext, id);
-                        sessionHandlers.put(id, handler);
-                    }
-                    builder = handler.newBuilder();
-                    fetchable.put(node, builder);
-                }
-                builder.add(partition, new FetchRequest.PartitionData(
-                    topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID),
-                    position.offset, FetchRequest.INVALID_LOG_START_OFFSET, 
this.fetchSize,
-                    position.currentLeader.epoch, Optional.empty()));
-
-                log.debug("Added {} fetch request for partition {} at position 
{} to node {}", isolationLevel,
-                    partition, position, node);
-            }
-        }
-
-        Map<Node, FetchSessionHandler.FetchRequestData> reqs = new 
LinkedHashMap<>();
-        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : 
fetchable.entrySet()) {
-            reqs.put(entry.getKey(), entry.getValue().build());
-        }
-        return reqs;
-    }
-
-    /**
-     * Initialize a CompletedFetch object.
-     */
-    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> 
nextCompletedFetch) {
-        TopicPartition tp = nextCompletedFetch.partition;
-        FetchResponseData.PartitionData partition = 
nextCompletedFetch.partitionData;
-        long fetchOffset = nextCompletedFetch.nextFetchOffset;
-        CompletedFetch<K, V> completedFetch = null;
-        Errors error = Errors.forCode(partition.errorCode());
-
-        try {
-            if (!subscriptions.hasValidPosition(tp)) {
-                // this can happen when a rebalance happened while fetch is 
still in-flight
-                log.debug("Ignoring fetched records for partition {} since it 
no longer has valid position", tp);
-            } else if (error == Errors.NONE) {
-                // we are interested in this fetch only if the beginning 
offset matches the
-                // current consumed position
-                FetchPosition position = subscriptions.position(tp);
-                if (position == null || position.offset != fetchOffset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since its offset {} does not match " +
-                            "the expected offset {}", tp, fetchOffset, 
position);
-                    return null;
-                }
-
-                log.trace("Preparing to read {} bytes of data for partition {} 
with offset {}",
-                        FetchResponse.recordsSize(partition), tp, position);
-                Iterator<? extends RecordBatch> batches = 
FetchResponse.recordsOrFail(partition).batches().iterator();
-                completedFetch = nextCompletedFetch;
-
-                if (!batches.hasNext() && FetchResponse.recordsSize(partition) 
> 0) {
-                    if (completedFetch.requestVersion < 3) {
-                        // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
-                        Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
-                        throw new RecordTooLargeException("There are some 
messages at [Partition=Offset]: " +
-                                recordTooLargePartitions + " whose size is 
larger than the fetch size " + this.fetchSize +
-                                " and hence cannot be returned. Please 
considering upgrading your broker to 0.10.1.0 or " +
-                                "newer to avoid this issue. Alternately, 
increase the fetch size on the client (using " +
-                                
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
-                                recordTooLargePartitions);
-                    } else {
-                        // This should not happen with brokers that support 
FetchRequest/Response V3 or higher (i.e. KIP-74)
-                        throw new KafkaException("Failed to make progress 
reading messages at " + tp + "=" +
-                            fetchOffset + ". Received a non-empty fetch 
response from the server, but no " +
-                            "complete records were found.");
-                    }
-                }
-
-                if (partition.highWatermark() >= 0) {
-                    log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark());
-                    subscriptions.updateHighWatermark(tp, 
partition.highWatermark());
+                    handleFetchResponse(fetchTarget, e);
                 }
+            };
 
-                if (partition.logStartOffset() >= 0) {
-                    log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset());
-                    subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset());
-                }
-
-                if (partition.lastStableOffset() >= 0) {
-                    log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset());
-                    subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
-                }
-
-                if (FetchResponse.isPreferredReplica(partition)) {
-                    
subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica(), () -> {
-                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
-                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
-                                tp, partition.preferredReadReplica(), 
expireTimeMs);
-                        return expireTimeMs;
-                    });
-                }
-
-                nextCompletedFetch.initialized = true;
-            } else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-                       error == Errors.REPLICA_NOT_AVAILABLE ||
-                       error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH ||
-                       error == Errors.OFFSET_NOT_AVAILABLE) {
-                log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.UNKNOWN_TOPIC_ID) {
-                log.warn("Received unknown topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
-                log.warn("Received inconsistent topic ID error in fetch for 
partition {}", tp);
-                requestMetadataUpdate(tp);
-            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
-                if (!clearedReplicaId.isPresent()) {
-                    // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
-                    FetchPosition position = subscriptions.position(tp);
-                    if (position == null || fetchOffset != position.offset) {
-                        log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, 
fetchOffset, position);
-                    } else {
-                        handleOffsetOutOfRange(position, tp);
-                    }
-                } else {
-                    log.debug("Unset the preferred read replica {} for 
partition {} since we got {} when fetching {}",
-                            clearedReplicaId.get(), tp, error, fetchOffset);
-                }
-            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                //we log the actual partition and not just the topic to help 
with ACL propagation issues in large clusters
-                log.warn("Not authorized to read from partition {}.", tp);
-                throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
-            } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
-                log.debug("Received unknown leader epoch error in fetch for 
partition {}", tp);
-            } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
-                log.warn("Unknown server error while fetching offset {} for 
topic-partition {}",
-                        fetchOffset, tp);
-            } else if (error == Errors.CORRUPT_MESSAGE) {
-                throw new KafkaException("Encountered corrupt message when 
fetching offset "
-                        + fetchOffset
-                        + " for topic-partition "
-                        + tp);
-            } else {
-                throw new IllegalStateException("Unexpected error code "
-                        + error.code()
-                        + " while fetching at offset "
-                        + fetchOffset
-                        + " from topic-partition " + tp);
-            }
-        } finally {
-            if (completedFetch == null)
-                nextCompletedFetch.recordAggregatedMetrics(0, 0);
-
-            if (error != Errors.NONE)
-                // we move the partition to the end if there was an error. 
This way, it's more likely that partitions for
-                // the same topic can remain together (allowing for more 
efficient serialization).
-                subscriptions.movePartitionToEnd(tp);
-        }
-
-        return completedFetch;
-    }
-
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition, 
TopicPartition topicPartition) {
-        String errorMessage = "Fetch position " + fetchPosition + " is out of 
range for partition " + topicPartition;
-        if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("{}, resetting offset", errorMessage);
-            subscriptions.requestOffsetReset(topicPartition);
-        } else {
-            log.info("{}, raising error to the application since no reset 
policy is configured", errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage,
-                Collections.singletonMap(topicPartition, 
fetchPosition.offset));
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned 
partitions
-     *
-     * @param assignedPartitions  newly assigned {@link TopicPartition}
-     */
-    public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> 
assignedPartitions) {
-        Iterator<CompletedFetch<K, V>> completedFetchesItr = 
completedFetches.iterator();
-        while (completedFetchesItr.hasNext()) {
-            CompletedFetch<K, V> records = completedFetchesItr.next();
-            TopicPartition tp = records.partition;
-            if (!assignedPartitions.contains(tp)) {
-                records.drain();
-                completedFetchesItr.remove();
-            }
-        }
-
-        if (nextInLineFetch != null && 
!assignedPartitions.contains(nextInLineFetch.partition)) {
-            nextInLineFetch.drain();
-            nextInLineFetch = null;
+            final RequestFuture<ClientResponse> future = 
client.send(fetchTarget, request);
+            future.addListener(listener);
         }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned topics
-     *
-     * @param assignedTopics  newly assigned topics
-     */
-    public void clearBufferedDataForUnassignedTopics(Collection<String> 
assignedTopics) {
-        Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
-            if (assignedTopics.contains(tp.topic())) {
-                currentTopicPartitions.add(tp);
-            }
-        }
-        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-    }
 
-    // Visible for testing
-    protected FetchSessionHandler sessionHandler(int node) {
-        return sessionHandlers.get(node);
+        return fetchRequestMap.size();
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics, 
FetchMetricsRegistry metricsRegistry) {
-        Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
-
-        
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax),
 new Max());
-
-        return fetchThrottleTimeSensor;
+    @Override
+    protected synchronized void handleFetchResponse(Node fetchTarget,

Review Comment:
   What about just letting the super's func to be synchronized? Any concerns 
for that option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to