Onur Karaman created KAFKA-4753:
-----------------------------------
Summary: KafkaConsumer susceptible to FetchResponse starvation
Key: KAFKA-4753
URL: https://issues.apache.org/jira/browse/KAFKA-4753
Project: Kafka
Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman
FetchResponse starvation here means that the KafkaConsumer repeatedly fails to
fully form FetchResponses within the request timeout from a subset of the
brokers its fetching from while FetchResponses from the other brokers can get
fully formed and processed by the application.
In other words, this ticket is concerned with scenarios where fetching from
some brokers hurts the progress of fetching from other brokers to the point of
repeatedly hitting a request timeout.
Some FetchResponse starvation scenarios:
1. partition leadership of the consumer's assigned partitions is skewed across
brokers, causing uneven FetchResponse sizes across brokers.
2. the consumer seeks back on partitions on some brokers but not others,
causing uneven FetchResponse sizes across brokers.
3. the consumer's ability to keep up with various partitions across brokers is
skewed, causing uneven FetchResponse sizes across brokers.
I've personally seen scenario 1 happen this past week to one of our users in
prod. They manually assigned partitions such that a few brokers led most of the
partitions while other brokers only led a single partition. When NetworkClient
sends out FetchRequests to different brokers in parallel with an uneven
partition distribution, FetchResponses from brokers who lead more partitions
will contain more data than FetchResponses from brokers who lead few
partitions. This means the small FetchResponses will get fully formed quicker
than larger FetchResponses. When the application eventually consumes a smaller
fully formed FetchResponses, the NetworkClient will send out a new FetchRequest
to the lightly-loaded broker. Their response will again come back quickly while
only marginal progress has been made on the larger FetchResponse. Repeat this
process several times and your application will have potentially processed many
smaller FetchResponses while the larger FetchResponse made minimal progress and
is forced to timeout, causing the large FetchResponse to start all over again,
which causes starvation.
To mitigate the problem for the short term, I've suggested to our user that
they either:
1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to
something like 1 MB. This is the solution I short-term solution I suggested
they go with.
2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB
to something like 100 KB. This solution wasn't advised as it could impact
broker performance.
3. ask our SRE's to run a partition reassignment to balance out the partition
leadership (partitions were already being led by their preferred leaders).
4. bump up their request timeout. It was set to open-source's former default of
40 seconds.
Contributing factors:
1. uneven FetchResponse sizes across brokers.
2. processing time of the polled ConsumerRecords.
3. "max.poll.records" increases the number of polls needed to consume a
FetchResponse, making constant-time overhead per poll magnified.
4. "max.poll.records" makes KafkaConsumer.poll bypass calls to
ConsumerNetworkClient.poll.
5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and
ConsumerNetworkClient.poll can return before the poll timeout as soon as a
single channel is selected.
6. NetworkClient.poll is solely driven by the user thread with manual partition
assignment.
So far I've only locally reproduced starvation scenario 1 and haven't even
attempted the other scenarios. Preventing the bypass of
ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but it
seems starvation would still be possible.
How to reproduce starvation scenario 1:
1. startup zookeeper
2. startup two brokers
3. create a topic t0 with two partitions led by broker 0 and create a topic t1
with a single partition led by broker 1
{code}
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0
> --replica-assignment 0,0
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1
> --replica-assignment 1
{code}
4. Produce a lot of data into these topics
{code}
> ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000
> --record-size 100 --throughput 100000 --producer-props
> bootstrap.servers=localhost:9090,localhost:9091
> ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000
> --record-size 100 --throughput 100000 --producer-props
> bootstrap.servers=localhost:9090,localhost:9091
{code}
5. startup a consumer that consumes these 3 partitions with TRACE level
NetworkClient logging
{code}
> ./bin/kafka-run-class.sh
> org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
{code}
The config/tools-log4j.properties:
{code}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=WARN, stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err
log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
{code}
The consumer code:
{code}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class StarvedFetchResponseTest {
public static void main(String[] args) throws InterruptedException {
long pollTimeout = Long.valueOf(args[0]);
long sleepDuration = Long.valueOf(args[1]);
String receiveBufferSize = args[2];
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9090,localhost:9091");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"fetch-response-starvation");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
receiveBufferSize);
KafkaConsumer<byte[], byte[]> kafkaConsumer = new
KafkaConsumer<>(props);
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < 2; i++) {
partitions.add(new TopicPartition("t0", i));
}
partitions.add(new TopicPartition("t1", 0));
kafkaConsumer.assign(partitions);
kafkaConsumer.seekToBeginning(partitions);
while (true) {
ConsumerRecords<byte[], byte[]> records =
kafkaConsumer.poll(pollTimeout);
System.out.println(recordsPerTopic(records));
Thread.sleep(sleepDuration);
}
}
private static Map<TopicPartition, Integer>
recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
Map<TopicPartition, Integer> result = new HashMap<>();
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition partition : partitions) {
if (!result.containsKey(partition)) {
result.put(partition, 0);
}
result.put(partition, result.get(partition) +
records.records(partition).size());
}
return result;
}
}
{code}
After running it for 30 minutes, around 33 FetchResponses from broker 1 were
served to the application while the many partially formed FetchResponses from
broker 0 were cancelled due to a disconnect from request timeout. It seems that
were was only one successful FetchResponse from broker 0 served to the
application during this time.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)