[
https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336985#comment-14336985
]
Guozhang Wang edited comment on KAFKA-1910 at 3/3/15 12:45 AM:
---------------------------------------------------------------
The uploaded patch contains multiple fixes to the related JIRAs as well as
refactoring the new consumer itself. I will summarize them here instead of in
the RB:
1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to
call startup() on the old brokers instead of creating new ones as the last
approach will case the metadata to be mess up and cause the test to hang
(KAFKA-1948). Also make sure the "test" topic is created with correct
replication factor to avoid hanging when the only replica broker was shutdown.
Also make the bouncing of the brokers in the background thread so that it will
eventually be restarted.
2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the
__consumer_offsets topic may be created with replication as
min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for
details (KAFKA-1975).
3. Add the IllegalGeneration logic in the coordinator as it is important for
consumers rebalancing after rediscovering the coordinator, in the current stub
it always return OK and hence consumers migrating to the new coordinator will
not trigger rebalance (KAFKA-1964).
4. Create the Coodinator and the FetchManager modules as KafkaConsumer
internals. Coordinator is responsible for assign partitions (join groups),
commit offsets and fetch offsets from coordinator, and FetchManager is
responsible for handling fetch request / responses.
4.1 After the refactoring it is easier to detect and fix a bug where response
callbacks being triggered multiple times, causing the coordinator NPE
(KAFKA-1969).
4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer
decides to update fetch positions, introduce a few new variables / APIs in
SubscriptionState accordingly.
4.3 Move serializer / de-serializer configs / constructors to AbstractConfig.
4.4 Add missing error handling in commit offset / heartbeat responses. In
general I think we should make notes about possible error codes in each of the
response type to help coding error handling logic, has filed KAFKA-1985 for
that.
was (Author: guozhang):
The uploaded patch contains multiple fixes to the related JIRAs as well as
refactoring the new consumer itself. I will summarize them here instead of in
the RB:
1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to
call startup() on the old brokers instead of creating new ones as the last
approach will case the metadata to be mess up and cause the test to hang
(KAFKA-1948). Also make sure the "test" topic is created with correct
replication factor to avoid hanging when the only replica broker was shutdown.
2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the
__consumer_offsets topic may be created with replication as
min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for
details (KAFKA-1975).
3. Add the IllegalGeneration logic in the coordinator as it is important for
consumers rebalancing after rediscovering the coordinator, in the current stub
it always return OK and hence consumers migrating to the new coordinator will
not trigger rebalance (KAFKA-1964).
4. Create the Coodinator and the FetchManager modules as KafkaConsumer
internals. Coordinator is responsible for assign partitions (join groups),
commit offsets and fetch offsets from coordinator, and FetchManager is
responsible for handling fetch request / responses.
4.1 After the refactoring it is easier to detect and fix a bug where response
callbacks being triggered multiple times, causing the coordinator NPE
(KAFKA-1969).
4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer
decides to update fetch positions, introduce a few new variables / APIs in
SubscriptionState accordingly.
4.3 Move serializer / de-serializer configs / constructors to AbstractConfig.
4.4 Add missing error handling in commit offset / heartbeat responses. In
general I think we should make notes about possible error codes in each of the
response type to help coding error handling logic, has filed KAFKA-1985 for
that.
> Refactor KafkaConsumer
> ----------------------
>
> Key: KAFKA-1910
> URL: https://issues.apache.org/jira/browse/KAFKA-1910
> Project: Kafka
> Issue Type: Sub-task
> Components: consumer
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
>
> KafkaConsumer now contains all the logic on the consumer side, making it a
> very huge class file, better re-factoring it to have multiple layers on top
> of KafkaClient.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)