[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16419345#comment-16419345 ]
Narayan Periwal commented on KAFKA-6681: ---------------------------------------- [~yuzhih...@gmail.com], We had one more occurence of the above issue. The topic had 1 partition and there were 4 consumers for it (all with the same consumer group name). Initially, as expected, only one if the consumer was reading from that partition and the others were simply doing nothing. We had an issue with our kafka cluster, due to which the entire cluster went down. When the cluster got up, after that I see all the 4 consumers reading that single partition of the topic, which was strange. For that topic, this is logs from the coordinator on the server side for that consumer group {noformat} [2018-03-27 23:06:49,113] INFO [GroupCoordinator 8]: Loading group metadata for testgroup with generation 63 (kafka.coordinator.GroupCoordinator) [2018-03-27 23:06:52,687] INFO [GroupCoordinator 8]: Preparing to restabilize group testgroup with old generation 63 (kafka.coordinator.GroupCoordinator) [2018-03-27 23:06:52,688] INFO [GroupCoordinator 8]: Stabilized group testgroup generation 64 (kafka.coordinator.GroupCoordinator) [2018-03-27 23:06:52,916] INFO [GroupCoordinator 8]: Assignment received from leader for group testgroup for generation 64 (kafka.coordinator.GroupCoordinator) {noformat} On the consumer side, the client-1 that was already reading that partition, on that we see the rebalancing getting triggered, both the callbacks onPartitionsRevoked and onPartitionsAssigned were invoked, while on client-2, none of these callbacks were invoked, however, still it started consuming the data from the partition, from there on. We saw the following exception in the client-2 logs, occuring 4 times with a gap of 1 to 2 seconds {noformat} 27 Mar 2018 23:06:42.307 ERROR [testgroup:testopic] [o.a.f.s.k.KafkaConsumerWorker.run:329] - testgroup:testopic:: exception occurred in kafka source worker. backing off for 1000 millis org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testopic-0 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:375) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.flume.source.kafka.KafkaConsumerWorker.fetchNextBatch(KafkaConsumerWorker.java:350) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] at org.apache.flume.source.kafka.KafkaConsumerWorker.run(KafkaConsumerWorker.java:291) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] 27 Mar 2018 23:06:43.743 ERROR [testgroup:testopic] [o.a.f.s.k.KafkaConsumerWorker.run:329] - testgroup:testopic:: exception occurred in kafka source worker. backing off for 1000 millis org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testopic-0 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:375) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:228) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1591) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.flume.source.kafka.KafkaConsumerWorker.fetchNextBatch(KafkaConsumerWorker.java:350) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] at org.apache.flume.source.kafka.KafkaConsumerWorker.run(KafkaConsumerWorker.java:291) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] 27 Mar 2018 23:06:44.979 ERROR [testgroup:testopic] [o.a.f.s.k.KafkaConsumerWorker.run:329] - testgroup:testopic:: exception occurred in kafka source worker. backing off for 1000 millis org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testopic-0 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:375) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:228) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1591) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.flume.source.kafka.KafkaConsumerWorker.fetchNextBatch(KafkaConsumerWorker.java:350) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] at org.apache.flume.source.kafka.KafkaConsumerWorker.run(KafkaConsumerWorker.java:291) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] 27 Mar 2018 23:06:46.198 ERROR [testgroup:testopic] [o.a.f.s.k.KafkaConsumerWorker.run:329] - testgroup:testopic:: exception occurred in kafka source worker. backing off for 1000 millis org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testopic-0 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:375) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:228) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1591) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) ~[kafka-clients-0.10.2.1.jar:na] at org.apache.flume.source.kafka.KafkaConsumerWorker.fetchNextBatch(KafkaConsumerWorker.java:350) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] at org.apache.flume.source.kafka.KafkaConsumerWorker.run(KafkaConsumerWorker.java:291) ~[flume-kafka-source-1.6.0.47.jar:1.6.0.47] {noformat} For client-3 and client-4 also, we did not get get the rebalancing callbacks, and we did not get the above exception as well, still it continued to read from that partition. On restart of client-3, still client-1 and client-2 were consuming from that partition. Only, after restarting all the clients, one by one, only one of the client started consuming the data from the partition. > Two instances of kafka consumer reading the same partition within a consumer > group > ---------------------------------------------------------------------------------- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.2.1 > Reporter: Narayan Periwal > Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > During this time, we also see the underreplicated partition metrics spiking. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)