Orel Shai created KAFKA-6553:
--------------------------------
Summary: Consumer consumed committed messages
Key: KAFKA-6553
URL: https://issues.apache.org/jira/browse/KAFKA-6553
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.10.2.0
Reporter: Orel Shai
Hi,
We're using consumer kafka client 0.10.2.0 (that is working against Kafka
broker 0.10.0) with the following configuration:
{code:java}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
{code}
So as you can see we're using autocommit.
The consumer API version that we're using has a dedicated thread for doing
autocommit ,so every one second we have an autocommit which means that we have
an heartbeat every one second.
For some reason we're getting the same message lots of times.
While looking at our logs I can see the following:
{code:java}
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset
352878
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset
352458
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset
353775
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset
352171
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset
352995
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset
352531
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset
351893
2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since
the current position is 352205
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since
the current position is 351929
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-20 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-2 since it is no
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-14 since it is no
longer fetchable
{code}
Consumer connection log:
{code:java}
2018-02-12 08:18:13,506 DEBUG [DefaultThreadPool-9] Starting the Kafka consumer
2018-02-12 08:18:13,507 INFO [DefaultThreadPool-9] ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [list of servers]
check.crcs = true
client.id = 2cd03a2b-f040-4f7f-b20c-ce3fe5efbe00
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = UpdateNode.snbrepo.new
heartbeat.interval.ms = 23333
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 16384
max.poll.interval.ms = 300000
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RoundRobinAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 100000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 70000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = <propriety deserializer>
{code}
Do you know what might be the cause for it?
Also , the processing time of the message may take more than the request
timeout . If we're doing auto commit then it counts as heartbeat? Is there
going to be any rebalance in such cases?
Thanks!
Orel
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)