RivenSun created KAFKA-13463:
--------------------------------

             Summary: Improvement: KafkaConsumer 
pause(Collection<TopicPartition> partitions)
                 Key: KAFKA-13463
                 URL: https://issues.apache.org/jira/browse/KAFKA-13463
             Project: Kafka
          Issue Type: Improvement
            Reporter: RivenSun


h1. 1.Background

When users use the kafkaConsumer#pause(...) method, they will maybe ignore: the 
pause method may no longer work, and data will be lost.

For example, the following simple code:
{code:java}
while (true) {
    try {
        kafkaConsumer.pause(kafkaConsumer.assignment());
        ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(2));
        if (!records.isEmpty()) {
            log.error("kafka poll for rebalance discard some record!");
        }
    } catch (Exception e) {
        log.error("maintain poll for rebalance with error:{}", e.getMessage(), 
e);
    }
}{code}
Even if you call pause(assignment) before the poll method every time, the poll 
method may still return messages.

 
h1. 2. RootCause:

In short, during the rebalance of the group, 
ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark on 
the partitions previously held by kafkaConsumer. However, while clearing the 
paused mark of partitions, the corresponding message in the memory 
(Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
Fetcher#fetchedRecords() still fetching the message and returning it to the 
customer.

For more detailed analysis, if you are interested, you can read Jira 
!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-13425]
 KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which 
maybe cause data loss on the consumer side - ASF JIRA , looking forward to your 
reply.

 
h1. 3.Discuss : Can KafkaConsumer support the pause method that is not affected 
by groupRebalance?

The KafkaConsumer#pause method actually stated one point at the beginning of 
its design:
 * Rebalance does not preserve pause/resume state.

link:!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-2350]
 Add KafkaConsumer pause capability - ASF JIRA

Unfortunately, I did not see this from the comments of the 
KafkaConsumer#pause(...) method. At the same time, 
ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
cleaning up the paused mark. I believe that this will cause many users to use 
the KafkaConsumer#pause(...) method incorrectly.

But I think it is necessary for KafkaConsumer to provide a pause method that is 
not affected by groupRebalance.

 
h1. 4. Suggestions

I will optimize the existing pause method from several different perspectives, 
or provide some new {{pause}} methods, and each point is an independent solution
h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
to clean up the revokedAndPausedPartitions message in memory when clearing the 
paused mark

This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
that revokedAndPausedPartitions is legal and returning messages. There are 
various checks on the partition in the fetchedRecords method.

The price of this is that if the user does not call the pause(...) method 
before calling the poll method next time, a new FetchMessage request may be 
initiated, which will cause additional network transmission.

 
h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side

<1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
pausedTopicPartitions from the current assignment of KafkaConsumer;

 <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
pausedTopicPartitions to render the latest assignment and restore the paused 
marks of the partitions that are still in the latest assignment.

{*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
topicPartitions that have been paused before rebalance, the paused mark of 
these topicPartitions will be lost forever on the kafkaConsumer side, even if 
in a future rebalance, the kafkaConsumer will hold these partitions again.

At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
suggestion on this point

<3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol

For example, consumers who use the currently supported PartitionAssignor: 
CooperativeStickyAssignor, through code analysis, we can find that the default 
behavior of these consumers is to maintain the old paused flag, and consumers 
who use the RebalanceProtocol.EAGER protocol default to clear all paused marks.

I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should 
be consistent, otherwise it will cause ambiguity to the existing 
KafkaConsumer#pause(...) and cause great confusion to users.

 
h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions

In the JoinGroup request, in addition to reporting the topic that it wants to 
subscribe to, each consumerMember should also report its pausedTopicPartitions. 
The JoinGroup response received by the LeaderConsumer should contain all paused 
partitions under the entire group.

The latest assignment made by LeaderConsumer should maintain the paused mark 
and be packaged in LeaderConsumer's SyncGroup request

In this way, after groupRebalance is completed, even if a paused topicpartition 
is assigned to a new consumer, the new consumer can continue to maintain the 
paused mark.

The KafkaConsumer#paused() method can return the partitions that KafkaConsumer 
did not call the pause(Collection<TopicPartition> partitions) method.

 
h2. 4)KafkaConsumer provides a pause method for topic level and supports 
regular expressions

{{KafkaConsumer#pause(Collection<String> topics)}}

{{KafkaConsumer#pause(Pattern pattern)}}

Similar to the paused mark in SubscriptionState.assignment, we need to provide 
a new instance variable ‘TopicState’ in SubscriptionState to store the 
topic-level paused mark. The ‘TopicState’ data structure can refer to the 
existing TopicPartitionState.

<1> ‘TopicState’ should not be affected by groupRebalance, and the paused mark 
in TopicState will not be changed during the groupRebalance process. TopicState 
should be the memory mark of a single KafkaConsumer, and it does not have to be 
passed to other consumers after the rebalance is completed.

 

<2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if 
this consumer is not currently subscribed to any topic provided

 

<3> Fetcher's fetchedRecords() and sendFetches() can be combined with 
TopicState considerations to decide whether to return a message to the user or 
initiate a Fetch request

 

<4> Provide KafkaConsumer#resume(Collection<String> topics) and 
KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused 
marks.

 
h2. 5)KafkaConsumer provides a pause method for the consumer level

{{KafkaConsumer#pause()}}

The existing pause method is for topicPartition and may sometimes be too 
fine-grained. And the paused mark is bound in the assignment, it is inevitable 
that it will not be affected by groupRebalance.

<1> This method may also be the user's most urgent need. After calling this 
pause() method, kafkaConsumer will mark itself as a paused state, and the poll 
method will determine the value of isKafkaConsumerPaused to decide whether to 
return a message to the user or initiate a Fetch request. This 
isKafkaConsumerPaused mark should also be held by a single KafkaConsumer itself.

 

<2> Users do not need to worry about the poll method returning data after 
calling the KafkaConsumer#pause() method.

Users can always call the poll method to avoid the following two results if 
kafkaConsumer does not call the poll method for a long time

             (1) The heartbeat thread detection mechanism causes kafkaConsumer 
to actively leaveGroup;

             (2) At this time, groupRebalance is triggered. The 
groupCoordinator will wait for the consumer to initiate a Join Group request. 
The groupRebalance cannot be completed for a long time (limited by 
max.poll.interval.ms), causing all consumers under the entire group to suspend 
consumption.

 

<3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up the 
paused mark of KafkaConsumer



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to