[ https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879558#comment-15879558 ]
Onur Karaman commented on KAFKA-4753: ------------------------------------- [~ijuma] I don't think mitigation is the way to go. The two solutions I can think of are to either: 1. introduce some sort of fair scheduling logic that polls against all brokers but only returns records to the user by waiting in a round-robin manner for a completed FetchResponse from a given broker. This would damage performance since you're blocking the processing of fully formed FetchResponses on the slowest fetch. 2. do all IO in a separate thread. I think we've been gravitating towards this since the beginning. - January 2015: In KAFKA-1760, KafkaConsumer implemented as being entirely single-threaded with user-driven fetches and heartbeats. - July 2015: I sent out an [email|https://www.mail-archive.com/dev@kafka.apache.org/msg31447.html] stating concerns with this approach, including the user-driven fetches and heartbeats. Becket and I proposed a [rough design for the fix|https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal] back then. Some nice API tweaks were made as a result of the discussion but the user-driven fetches and heartbeats remained. - October 2015: In KAFKA-2397 I added LeaveGroupRequest to reduce the turnaround from a controlled client shutdown and rebalancing the group. - February 2016: In KIP-62 (KAFKA-3888) we added "max.poll.records" in KIP-41 to mitigate the impact of record processing on the user-driven heartbeats. - August 2016: We added a separate heartbeat thread, synchronization logic surrounding NetworkClient, and retained poll-based liveness by automatically sending out LeaveGroupRequest when "max.poll.interval.ms" wasn't honored. - February 2017: Some of the simplest uses of KafkaConsumer with manual partition assignment suffers from starvation again because of the user-driven fetch design decision. I think we should try to make solution 2 work. We can retain the poll-based liveness by continuing what we do today: automatic sending of LeaveGroupRequest when "max.poll.interval.ms" isn't honored. This fix could also help KAFKA-1895 as well, as we can potentially push decompression/deserialization in the separate IO thread. > KafkaConsumer susceptible to FetchResponse starvation > ----------------------------------------------------- > > Key: KAFKA-4753 > URL: https://issues.apache.org/jira/browse/KAFKA-4753 > Project: Kafka > Issue Type: Bug > Reporter: Onur Karaman > Assignee: Onur Karaman > > FetchResponse starvation here means that the KafkaConsumer repeatedly fails > to fully form FetchResponses within the request timeout from a subset of the > brokers its fetching from while FetchResponses from the other brokers can get > fully formed and processed by the application. > In other words, this ticket is concerned with scenarios where fetching from > some brokers hurts the progress of fetching from other brokers to the point > of repeatedly hitting a request timeout. > Some FetchResponse starvation scenarios: > 1. partition leadership of the consumer's assigned partitions is skewed > across brokers, causing uneven FetchResponse sizes across brokers. > 2. the consumer seeks back on partitions on some brokers but not others, > causing uneven FetchResponse sizes across brokers. > 3. the consumer's ability to keep up with various partitions across brokers > is skewed, causing uneven FetchResponse sizes across brokers. > I've personally seen scenario 1 happen this past week to one of our users in > prod. They manually assigned partitions such that a few brokers led most of > the partitions while other brokers only led a single partition. When > NetworkClient sends out FetchRequests to different brokers in parallel with > an uneven partition distribution, FetchResponses from brokers who lead more > partitions will contain more data than FetchResponses from brokers who lead > few partitions. This means the small FetchResponses will get fully formed > quicker than larger FetchResponses. When the application eventually consumes > a smaller fully formed FetchResponses, the NetworkClient will send out a new > FetchRequest to the lightly-loaded broker. Their response will again come > back quickly while only marginal progress has been made on the larger > FetchResponse. Repeat this process several times and your application will > have potentially processed many smaller FetchResponses while the larger > FetchResponse made minimal progress and is forced to timeout, causing the > large FetchResponse to start all over again, which causes starvation. > To mitigate the problem for the short term, I've suggested to our user that > they either: > 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB > to something like 1 MB. This is the solution I short-term solution I > suggested they go with. > 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 > MB to something like 100 KB. This solution wasn't advised as it could impact > broker performance. > 3. ask our SRE's to run a partition reassignment to balance out the partition > leadership (partitions were already being led by their preferred leaders). > 4. bump up their request timeout. It was set to open-source's former default > of 40 seconds. > Contributing factors: > 1. uneven FetchResponse sizes across brokers. > 2. processing time of the polled ConsumerRecords. > 3. "max.poll.records" increases the number of polls needed to consume a > FetchResponse, making constant-time overhead per poll magnified. > 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to > ConsumerNetworkClient.poll. > 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and > ConsumerNetworkClient.poll can return before the poll timeout as soon as a > single channel is selected. > 6. NetworkClient.poll is solely driven by the user thread with manual > partition assignment. > So far I've only locally reproduced starvation scenario 1 and haven't even > attempted the other scenarios. Preventing the bypass of > ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but > it seems starvation would still be possible. > How to reproduce starvation scenario 1: > 1. startup zookeeper > 2. startup two brokers > 3. create a topic t0 with two partitions led by broker 0 and create a topic > t1 with a single partition led by broker 1 > {code} > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > > --replica-assignment 0,0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > > --replica-assignment 1 > {code} > 4. Produce a lot of data into these topics > {code} > > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 > > --record-size 100 --throughput 100000 --producer-props > > bootstrap.servers=localhost:9090,localhost:9091 > > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 > > --record-size 100 --throughput 100000 --producer-props > > bootstrap.servers=localhost:9090,localhost:9091 > {code} > 5. startup a consumer that consumes these 3 partitions with TRACE level > NetworkClient logging > {code} > > ./bin/kafka-run-class.sh > > org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536 > {code} > The config/tools-log4j.properties: > {code} > # Licensed to the Apache Software Foundation (ASF) under one or more > # contributor license agreements. See the NOTICE file distributed with > # this work for additional information regarding copyright ownership. > # The ASF licenses this file to You under the Apache License, Version 2.0 > # (the "License"); you may not use this file except in compliance with > # the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > log4j.rootLogger=WARN, stderr > log4j.appender.stderr=org.apache.log4j.ConsoleAppender > log4j.appender.stderr.layout=org.apache.log4j.PatternLayout > log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n > log4j.appender.stderr.Target=System.err > log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr > log4j.additivity.org.apache.kafka.clients.NetworkClient=false > {code} > The consumer code: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one or more > contributor license agreements. See the NOTICE > * file distributed with this work for additional information regarding > copyright ownership. The ASF licenses this file > * to You under the Apache License, Version 2.0 (the "License"); you may not > use this file except in compliance with the > * License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on > * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either > express or implied. See the License for the > * specific language governing permissions and limitations under the License. > */ > package org.apache.kafka.clients.consumer; > import org.apache.kafka.common.TopicPartition; > import java.util.ArrayList; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.Set; > public class StarvedFetchResponseTest { > public static void main(String[] args) throws InterruptedException { > long pollTimeout = Long.valueOf(args[0]); > long sleepDuration = Long.valueOf(args[1]); > String receiveBufferSize = args[2]; > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9090,localhost:9091"); > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "fetch-response-starvation"); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); > props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000"); > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, > receiveBufferSize); > KafkaConsumer<byte[], byte[]> kafkaConsumer = new > KafkaConsumer<>(props); > List<TopicPartition> partitions = new ArrayList<>(); > for (int i = 0; i < 2; i++) { > partitions.add(new TopicPartition("t0", i)); > } > partitions.add(new TopicPartition("t1", 0)); > kafkaConsumer.assign(partitions); > kafkaConsumer.seekToBeginning(partitions); > while (true) { > ConsumerRecords<byte[], byte[]> records = > kafkaConsumer.poll(pollTimeout); > System.out.println(recordsPerTopic(records)); > Thread.sleep(sleepDuration); > } > } > private static Map<TopicPartition, Integer> > recordsPerTopic(ConsumerRecords<byte[], byte[]> records) { > Map<TopicPartition, Integer> result = new HashMap<>(); > Set<TopicPartition> partitions = records.partitions(); > for (TopicPartition partition : partitions) { > if (!result.containsKey(partition)) { > result.put(partition, 0); > } > result.put(partition, result.get(partition) + > records.records(partition).size()); > } > return result; > } > } > {code} > After running it for 30 minutes, around 33 FetchResponses from broker 1 were > served to the application while the many partially formed FetchResponses from > broker 0 were cancelled due to a disconnect from request timeout. It seems > that were was only one successful FetchResponse from broker 0 served to the > application during this time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)