[
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649485#comment-15649485
]
Mayuresh Gharat commented on KAFKA-4362:
----------------------------------------
I did some more testing while reproducing these error scenarios.
There are 2 bugs :
1) OFFSET COMMITS :
When we check for MessageFormatVersion, we actually first check if the replica
is local for the __consumer_offsets topic and if its not, we return an
illegalArgumentException. This results in an Unknown Exception on the client
side and commitOffset operation gets an exception.
Also as a side effect of this, if you start another consumer in the same group
for consuming from the same topic after the rebalance is done and start
producing to the topic, you will see that both consumers are consuming the same
data. This is because the second consumer that you have started is talking to
the right coordinator and the first consumer is completely unaware of the
presence of second consumer.
2) CONSUMER REBALANCE :
While doing topic reassignment for example moving replicas from (1,2,3 [Leader
: 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends
stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling
this request on server side, we never get rid of the particular partition of
__consumer_offsets topic from the coordinators (broker 1) cache. When a
handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1)
actually has a check if the group is local. But since we have not removed the
group from its cache on the earlier stopRepicaRequest from the controller, it
does not return NotCoordinatorForGroupException but proceeds with success. So
the consumer thinks that its talking to the right coordinator (which is not the
case since we moved the coordinator to broker 4 from broker 1). On the consumer
side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest
to broker 1, which in turn calls the code for checking the MessageFormatVersion
on the server. At this point it throws an illegalArgumentException for same
reason expalined in point 1) above. This causes the syncGroupRequest to fail
with unknown exception in the SyncGroupResponseHandler callback.
The exact steps for reproducing these scenarios are as follows :
For 1)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA.
c) Start a console consumer with a groupId = testGroupA consuming from
testtopicA.
d) Produce and consume some data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the
leader out of replica lists.
g) You should see frequent exceptions on the consumer side, something like this
:
[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition
testtopicA-0 at offset 14: The server experienced an unexpected error when
processing the request
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
h) If you still produce to the topic, you should be able to see data in the
console of this consumer, but its not able to commit offsets.
i) Now if you start another console consumer with same groupId = testGroupA
consuming from the same topic testtopicA and if you produce more data, you
should be able to see the data in both the consumer consoles.
For 2)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA.
c) Start a console consumer with a groupId = testGroupA consuming from
testtopicA.
c) Start another console consumer with a groupId = testGroupA consuming from
testtopicA.
d) Produce and consume some data. Exactly one of them should be consuming the
data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the
leader out of replica lists.
g) You should see frequent exceptions on the consumer actually fetching data,
something like this :
[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition
testtopicA-0 at offset 14: The server experienced an unexpected error when
processing the request
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
h) If you kill this consumer, you should immediately see an exception on the
other consumer console, something like this :
[2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer
process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The
server experienced an unexpected error when processing the request
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:485)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
I have a fix for both of these issues and will upload a PR for it.
[~ijuma] yes the ticket that you mentioned seems a duplicate and can be closed.
> Consumer can fail after reassignment of the offsets topic partition
> -------------------------------------------------------------------
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.1.0
> Reporter: Joel Koshy
> Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100
> not found
> at
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
> ~[kafka_2.10.jar:?]
> at
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
> ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
> ~[kafka_2.10.jar:?]
> at
> ...
> {code}
> The issue is that the replica has been deleted so the
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions
> instead of the actual partition.)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)