[
https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551082#comment-17551082
]
Matthew de Detrich edited comment on KAFKA-8420 at 6/7/22 1:20 PM:
-------------------------------------------------------------------
So in order to work on this issue I tried making a test to replicate what you
are describing and I came across some interesting, the test that I wrote looks
like this
{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic),
getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor,
singletonList(tp0), null);
ConsumerRecords<String, String> initialConsumerRecords =
consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());
consumer.unsubscribe();
consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0),
Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however
this caused the {{consumer.poll}} method to short circuit due to
{{timer.notExpired()}} never executing and hence just immediately returning an
{{ConsumerRecords.empty();}} without the consumer ever sending a request to
trigger a sync-group resonse).
After spending some time debugging this is the piece of code that is not
terminating
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251].
What I am finding highly confusing if the fact that the
{{lookupCoordinator()}} does actually complete (in this case it immediately
returns {{findCoordinatorFuture}} at
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294])
however for some reason the loop at
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215]
never terminates. It doesn't appear to detect that the future has finished
which I believe to be the case? I am not sure if this is related to what you
mentioned, i.e.
{quote}In the worst case (i.e. leader keep sending incompatible assignment),
this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up
the wrong tree? Do you have any insights into this [~guozhang]
was (Author: mdedetrich-aiven):
So in order to work on this issue I tried making a test to replicate what you
are describing and I came across some interesting, the test that I wrote looks
like this
{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic),
getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor,
singletonList(tp0), null);
ConsumerRecords<String, String> initialConsumerRecords =
consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());
consumer.unsubscribe();
consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0),
Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however
this caused the {{consumer.poll}} method to short circuit due to
{{timer.notExpired()}} never executing and hence just immediately returning an
{{ConsumerRecords.empty();}} without the consumer ever sending a request to
trigger a sync-group resonse).
After spending some time debugging this is the piece of code that is not
terminating
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251].
What I am finding highly confusing if the fact that the
{{lookupCoordinator()}} does actually complete (in this case it immediately
returns {{findCoordinatorFuture}} at
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294])
however for some reason the loop at
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215]
never terminates. It doesn't appear to detect that the future has finished
which I believe to be the case? I am not sure if this is related to what you
mentioned, i.e.
{quote}
In the worst case (i.e. leader keep sending incompatible assignment), this
would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up
the wrong tree?
> Graceful handling when consumer switches from subscribe to manual assign
> ------------------------------------------------------------------------
>
> Key: KAFKA-8420
> URL: https://issues.apache.org/jira/browse/KAFKA-8420
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: Guozhang Wang
> Assignee: Matthew de Detrich
> Priority: Major
>
> Today if a consumer switches between subscribe (and hence relies on group
> rebalance to get assignment) and manual assign, it may cause unnecessary
> rebalances. For example:
> 1. consumer.subscribe();
> 2. consumer.poll(); // join-group request sent, returns empty because
> poll timeout
> 3. consumer.unsubscribe();
> 4. consumer.assign(..);
> 5. consumer.poll(); // sync-group request received, and the assigned
> partitions does not match the current subscription-state. In this case it
> will tries to re-join which is not necessary.
> In the worst case (i.e. leader keep sending incompatible assignment), this
> would case the consumer to fall into endless re-joins.
> Although it is not a very common usage scenario, it still worth being better
> handled than the status-quo.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)