[ 
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374767#comment-16374767
 ] 

Manoj Ramakrishnan commented on KAFKA-4753:
-------------------------------------------

Hi @Jason or others, any resolution which is planned on this one or any 
workaround we attempt? We are seeing lots of restabilization errors after 
adding one more consumer app in our production load

> KafkaConsumer susceptible to FetchResponse starvation
> -----------------------------------------------------
>
>                 Key: KAFKA-4753
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4753
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
>            Assignee: Onur Karaman
>            Priority: Major
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails 
> to fully form FetchResponses within the request timeout from a subset of the 
> brokers its fetching from while FetchResponses from the other brokers can get 
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from 
> some brokers hurts the progress of fetching from other brokers to the point 
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed 
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, 
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers 
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in 
> prod. They manually assigned partitions such that a few brokers led most of 
> the partitions while other brokers only led a single partition. When 
> NetworkClient sends out FetchRequests to different brokers in parallel with 
> an uneven partition distribution, FetchResponses from brokers who lead more 
> partitions will contain more data than FetchResponses from brokers who lead 
> few partitions. This means the small FetchResponses will get fully formed 
> quicker than larger FetchResponses. When the application eventually consumes 
> a smaller fully formed FetchResponses, the NetworkClient will send out a new 
> FetchRequest to the lightly-loaded broker. Their response will again come 
> back quickly while only marginal progress has been made on the larger 
> FetchResponse. Repeat this process several times and your application will 
> have potentially processed many smaller FetchResponses while the larger 
> FetchResponse made minimal progress and is forced to timeout, causing the 
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that 
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB 
> to something like 1 MB. This is the solution I short-term solution I 
> suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 
> MB to something like 100 KB. This solution wasn't advised as it could impact 
> broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition 
> leadership (partitions were already being led by their preferred leaders).
> 4. bump up their request timeout. It was set to open-source's former default 
> of 40 seconds.
> Contributing factors:
> 1. uneven FetchResponse sizes across brokers.
> 2. processing time of the polled ConsumerRecords.
> 3. "max.poll.records" increases the number of polls needed to consume a 
> FetchResponse, making constant-time overhead per poll magnified.
> 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to 
> ConsumerNetworkClient.poll.
> 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and 
> ConsumerNetworkClient.poll can return before the poll timeout as soon as a 
> single channel is selected.
> 6. NetworkClient.poll is solely driven by the user thread with manual 
> partition assignment.
> So far I've only locally reproduced starvation scenario 1 and haven't even 
> attempted the other scenarios. Preventing the bypass of 
> ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but 
> it seems starvation would still be possible.
> How to reproduce starvation scenario 1:
> 1. startup zookeeper
> 2. startup two brokers
> 3. create a topic t0 with two partitions led by broker 0 and create a topic 
> t1 with a single partition led by broker 1
> {code}
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 0,0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1
> {code}
> 4. Produce a lot of data into these topics
> {code}
> > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 
> > --record-size 100 --throughput 100000 --producer-props 
> > bootstrap.servers=localhost:9090,localhost:9091
> > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 
> > --record-size 100 --throughput 100000 --producer-props 
> > bootstrap.servers=localhost:9090,localhost:9091
> {code}
> 5. startup a consumer that consumes these 3 partitions with TRACE level 
> NetworkClient logging
> {code}
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
> {code}
> The config/tools-log4j.properties:
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> log4j.rootLogger=WARN, stderr
> log4j.appender.stderr=org.apache.log4j.ConsoleAppender
> log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
> log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
> log4j.appender.stderr.Target=System.err
> log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
> log4j.additivity.org.apache.kafka.clients.NetworkClient=false
> {code}
> The consumer code:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more 
> contributor license agreements. See the NOTICE
>  * file distributed with this work for additional information regarding 
> copyright ownership. The ASF licenses this file
>  * to You under the Apache License, Version 2.0 (the "License"); you may not 
> use this file except in compliance with the
>  * License. You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software 
> distributed under the License is distributed on
>  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
> express or implied. See the License for the
>  * specific language governing permissions and limitations under the License.
>  */
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Set;
> public class StarvedFetchResponseTest {
>     public static void main(String[] args) throws InterruptedException {
>         long pollTimeout = Long.valueOf(args[0]);
>         long sleepDuration = Long.valueOf(args[1]);
>         String receiveBufferSize = args[2];
>         Properties props = new Properties();
>         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090,localhost:9091");
>         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
> "fetch-response-starvation");
>         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
>         props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
>         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>         props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> receiveBufferSize);
>         KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
> KafkaConsumer<>(props);
>         List<TopicPartition> partitions = new ArrayList<>();
>         for (int i = 0; i < 2; i++) {
>             partitions.add(new TopicPartition("t0", i));
>         }
>         partitions.add(new TopicPartition("t1", 0));
>         kafkaConsumer.assign(partitions);
>         kafkaConsumer.seekToBeginning(partitions);
>         while (true) {
>             ConsumerRecords<byte[], byte[]> records = 
> kafkaConsumer.poll(pollTimeout);
>             System.out.println(recordsPerTopic(records));
>             Thread.sleep(sleepDuration);
>         }
>     }
>     private static Map<TopicPartition, Integer> 
> recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
>         Map<TopicPartition, Integer> result = new HashMap<>();
>         Set<TopicPartition> partitions = records.partitions();
>         for (TopicPartition partition : partitions) {
>             if (!result.containsKey(partition)) {
>                 result.put(partition, 0);
>             }
>             result.put(partition, result.get(partition) + 
> records.records(partition).size());
>         }
>         return result;
>     }
> }
> {code}
> After running it for 30 minutes, around 33 FetchResponses from broker 1 were 
> served to the application while the many partially formed FetchResponses from 
> broker 0 were cancelled due to a disconnect from request timeout. It seems 
> that were was only one successful FetchResponse from broker 0 served to the 
> application during this time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to