RivenSun created KAFKA-13425:
--------------------------------

             Summary: KafkaConsumer#pause() will lose its effect after 
groupRebalance occurs, which maybe cause data loss on the consumer side
                 Key: KAFKA-13425
                 URL: https://issues.apache.org/jira/browse/KAFKA-13425
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 3.0.0
            Reporter: RivenSun
         Attachments: architecture_picture.png

 
 
h1. Foreword:
Since I want to achieve the decoupling of the two processes of polling messages 
and consuming messages on the KafkaConsumer side, I use the "poll --> push" 
architecture model on the Kafka consumer side.

.
h2. Architecture
 see picture "architecture_picture"
h3. 1)ThreadPoolExecutor 
The key parameters of ThreadPoolExecutor threadPool are:
h4. (1) Select ArrayBlockingQueue<Runnable> for workQueue type
h4. (2) The handler uses the RejectedExecutionHandler interface
h4. (3)threadPool.allowCoreThreadTimeOut(true);

 
h3. 2) KafkaConsumer

The disadvantage of this architecture is that if the business side’s 
onMessage() method is time-consuming to execute, it will lead to:
h4. (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number 
of Tasks, and eventually the push message will fail.
h4. (2)How to deal with the KafkaConsumer poll() method when the push fails:
h5. 1. stop call poll()

KafkaConsumer needs to set 
*configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);* 
In order to prevent the heartbeat thread of KafkaConumser from discovering that 
KafkaConsumer does not call the poll() method for a long time, and 
automatically execute *maybeLeaveGroup("consumer poll timeout has expired.")*;

But the most serious consequence of this is that once the rebalance of the 
group is triggered for some reason, *the rebalance process of the entire group 
will not be completed because the kafkaConsumer does not call the poll() 
method. This will cause all consumers under the entire group to stop 
consumption.*
h5. 2. When the push message fails, continue to maintain the poll method

The purpose is to maintain the poll method call of kafkaConsumer, but at this 
time KafkaConsumer should not poll any messages, because the downstream 
BlockingQueue for storing messages is full. So at this time we need the help of 
KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this 
special poll method maintainPoll4Rebalance().

 
h1. maintainPoll4Rebalance Preliminary design ideas:

code Simple design:
{code:java}
    public static void main(String[] args) {
        while (true) {
            try {
                List<Object> messages = 
kafkaConsumer.poll(Duration.ofSeconds(1));

                while (!publish(message)) {
                    try {
                        maintainPoll4Rebalance();
                    } catch (Exception e) {
                        log.error("maintain poll for rebalance with error {}", 
e.getMessage(), e);
                        CommonUtil.sleep(TimeUnit.SECONDS, 1);
                    }
                }

            } catch (Exception e) {
                log.error("KafkaConsumer poll message has error: {}", 
e.getMessage(), e);
                CommonUtil.sleep(TimeUnit.MILLISECONDS, 
ClientConfig.CLIENT_SLEEP_INTERVAL_MS);
            }
        }
    }

    private boolean publish(Object message) {

        try {

            ...

            threadPool.execute(() -> onMessage(message));

        } catch (RejectedExecutionException e) {
            log.error("consumer execute failed with error{}", e.getMessage(), 
e);
            return false;
        } catch (Exception e) {
            log.error("consumer execute failed with error{}", e.getMessage(), 
e);
            return false;
        }
        return true;
    }

    private void maintainPoll4Rebalance() {
        try {
            kafkaConsumer.pause(kafkaConsumer.assignment());
            ConsumerRecords<String, Object> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
            if (!records.isEmpty()) {
                log.error("kafka poll for rebalance discard some record!");
                for (ConsumerRecord<String, Object> consumerRecord : records) {
                    if (consumerRecord != null) {
                        log.error("this record need to retry, partition {} 
,offset {}", consumerRecord.partition(), consumerRecord.offset());
                    }
                }
            }
        } catch (Exception e) {
            log.error("maintain poll for rebalance with error:{}", 
e.getMessage(), e);
        } finally {
            kafkaConsumer.resume(kafkaConsumer.assignment());
        }
    }
{code}
 

 

The above code maintainPoll4Rebalance() seems to be a good solution to my 
problem. When downstream consumption is blocked, KafkaConsumer can maintain the 
continuous call of the poll method, and it avoids that KafkaConsumer can 
continue to pull messages when the push fails.

But in reality, logs will appear during operation:
{code:java}
[main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record!
[main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset 
36901
{code}
I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment()) before 
kafkaConsumer#poll is called. Why does kafkaConsumer still pull the message and 
cause the message to be lost? The reason for the loss is that the consumer 
turned on the auto-commit offset.

 
h1. RootCause Analysis

 

KafkaConsumer#poll:
1) updateAssignmentMetadataIfNeeded
2) fetcher.fetchedRecords()
3) fetcher.sendFetches();


These three methods are the three most critical operations in 
KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible for 
group rebalance related work. And RC appears in the first and second steps.

 
h2. 1.updateAssignmentMetadataIfNeeded

 

We trace directly to ConsumerCoordinator#onJoinPrepare(...)
{code:java}
else {
            switch (protocol) {
                case EAGER:
                    // revoke all partitions
                    revokedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
                    exception = invokePartitionsRevoked(revokedPartitions);

                    subscriptions.assignFromSubscribed(Collections.emptySet());

                    break;

                case COOPERATIVE:
                    // only revoke those partitions that are not in the 
subscription any more.
                    Set<TopicPartition> ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
                    revokedPartitions = ownedPartitions.stream()
                        .filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
                        .collect(Collectors.toSet());

                    if (!revokedPartitions.isEmpty()) {
                        exception = invokePartitionsRevoked(revokedPartitions);

                        ownedPartitions.removeAll(revokedPartitions);
                        subscriptions.assignFromSubscribed(ownedPartitions);
                    }

                    break;
            }
        }
{code}
 

The value of the protocol instance variable here, see its initialization code
{code:java}
        // select the rebalance protocol such that:
        //   1. only consider protocols that are supported by all the 
assignors. If there is no common protocols supported
        //      across all the assignors, throw an exception.
        //   2. if there are multiple protocols that are commonly supported, 
select the one with the highest id (i.e. the
        //      id number indicates how advanced the protocol is).
        // we know there are at least one assignor in the list, no need to 
double check for NPE
        if (!assignors.isEmpty()) {
            List<RebalanceProtocol> supportedProtocols = new 
ArrayList<>(assignors.get(0).supportedProtocols());

            for (ConsumerPartitionAssignor assignor : assignors) {
                supportedProtocols.retainAll(assignor.supportedProtocols());
            }

            if (supportedProtocols.isEmpty()) {
                throw new IllegalArgumentException("Specified assignors " +
                    
assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet())
 +
                    " do not have commonly supported rebalance protocol");
            }

            Collections.sort(supportedProtocols);

            protocol = supportedProtocols.get(supportedProtocols.size() - 1);
        } else {
            protocol = null;
        }
{code}
 

After a simple analysis, we can understand that as long as supportedProtocols 
contains the RebalanceProtocol.COOPERATIVE element, the protocol value will be 
COOPERATIVE, otherwise it will be EAGER.

But to check the ConsumerPartitionAssignor interface, I found that all its 
implementation classes except CooperativeStickyAssignor, all other 
PartitionAssignor implementation classes have adopted default values
{code:java}
Indicate which rebalance protocol this assignor works with; By default it 
should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER.
default List<RebalanceProtocol> supportedProtocols() {
        return Collections.singletonList(RebalanceProtocol.EAGER);
    }
{code}
 

So the code will run to
{code:java}
                case EAGER:
                    // revoke all partitions
                    revokedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
                    exception = invokePartitionsRevoked(revokedPartitions);

                    subscriptions.assignFromSubscribed(Collections.emptySet());

                    break;
{code}
 

 

The problem is here, 
*subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the 
assignment in my subscriptions, and then {color:#FF0000}clear the paused mark 
for TopicPartition.{color}*
h2. 2.fetcher.fetchedRecords()

There is no need to go into the code here, fetchedRecords will verify the 
corresponding TopicPartition of each message set CompletedFetch in memory
h3. 1)if (subscriptions.isPaused(nextInLineFetch.partition))
h3. 2)if (!subscriptions.isAssigned(completedFetch.partition))
h3. 3)if (!subscriptions.isFetchable(completedFetch.partition))

 

The problem is: If within the pollTimer specified by the user, a poll(...) call 
completes the updateAssignmentMetadataIfNeeded operation, the 
updateAssignmentMetadataIfNeeded method returns true, and the paused flag for 
TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and 
KafkaConsumer assigns after the rebalance is completed. Still holding this 
TopicPartition. Then the verification of TopicPartition mentioned above will 
pass.

And the *{color:#FF0000}nextInLineFetch variable in KafkaConsumer memory stores 
TopicPartition messages,{color} the KafkaConsumer#poll(...) method will still 
return the message after calling pause(...). Even if you always call pause(...) 
before each poll(...), it will Return the message corresponding to 
TopicPartition.*

If the business side cannot process the message at this time, and the 
KafkaConsumer turns on the automatic submission offset switch, the message will 
be lost on the consumer side. The maximum number of lost messages 
max.poll.records.
{code:java}
        try {
            kafkaConsumer.pause(kafkaConsumer.assignment());
            ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(5));
            
        } catch (Exception e) {
            log.error("maintain poll for rebalance with error:{}", 
e.getMessage(), e);
        } finally {
            kafkaConsumer.resume(kafkaConsumer.assignment());
        }
{code}
 

 

 

 
h1. maintainPoll4Rebalance() Temporary solution

 

The paused mark of TopicPartition is remedied in 
ConsumerRebalanceListener#onPartitionsAssigned(...)
{code:java}
    private boolean maintainPoll4Rebalance;
   
    private void initKafkaConsumer() {

        kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
                confirmMessageSync();
                log.info("consumer on  partition revoked!");
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {

                try {
                    if (maintainPoll4Rebalance) {
                        kafkaConsumer.pause(kafkaConsumer.assignment());
                    }
                } catch (Exception e) {
                    log.error("consumer onPartitionsAssigned failed with 
error:{}!", e.getMessage(), e);
                }
                log.info("consumer on  partition assigned!");
            }
        });
    }
    
    
    private void maintainPoll4Rebalance() {
        try {
            maintainPoll4Rebalance = true;
            kafkaConsumer.pause(kafkaConsumer.assignment());
            ConsumerRecords<String, Object> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
            if (!records.isEmpty()) {
                log.error("kafka poll for rebalance discard some record!");
                for (ConsumerRecord<String, Object> consumerRecord : records) {
                    if (consumerRecord != null) {
                        log.error("this record need to retry, partition {} 
,offset {}", consumerRecord.partition(), consumerRecord.offset());
                    }
                }
            }
        } catch (Exception e) {
            log.error("maintain poll for rebalance with error:{}", 
e.getMessage(), e);
        } finally {
            maintainPoll4Rebalance = false;
            kafkaConsumer.resume(kafkaConsumer.assignment());
        }
    }
{code}
 

After testing, this problem can be temporarily solved. After calling 
kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return 
the corresponding TopicPartition message.
h1. Suggestions
h2. 1. Precise semantics of kafkaConsumer#pause(…)

First look at the comments on this method
{code:java}
Suspend fetching from the requested partitions. Future calls to poll(Duration) 
will not return any records from these partitions until they have been resumed 
using resume(Collection). Note that this method does not affect partition 
subscription. In particular, it does not cause a group rebalance when automatic 
assignment is used.
Params:
partitions – The partitions which should be paused
Throws:
IllegalStateException – if any of the provided partitions are not currently 
assigned to this consumer
@Override
    public void pause(Collection<TopicPartition> partitions) {
        acquireAndEnsureOpen();
        try {
            log.debug("Pausing partitions {}", partitions);
            for (TopicPartition partition: partitions) {
                subscriptions.pause(partition);
            }
        } finally {
            release();
        }
    }
{code}
{{}}

*We don’t know from the comments that {color:#FF0000}the pause method will lose 
its function after a groupRebalance.{color}*

 
h2. 2. When we execute invokePartitionsRevoked(revokedPartitions), do we 
consider the need to clean up the messages in KafkaConsumer memory 
corresponding to revokedPartitions?

If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to 
re-initiate FetchRequests for resumedPartitions, which brings additional 
network transmission

 
h2. 3.We better support the pause(...) method on the KafkaConsumer side that is 
not affected by groupRebalance

 
h3. 1) When rebalance starts to prepare, add new logic to 
ConsumerCoordinator#onJoinPrepare(...)

Before executing invokePartitionsRevoked(...) and 
subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions 
from the subscriptions.assignment of the current KafkaConsumer, and 
customerPausedPartitions should be instance variables of ConsumerCoordinator.
{code:java}
customerPausedPartitions = subscriptions.pausedPartitions();
//Add new code in front of the following two codes

exception = invokePartitionsRevoked(...);
subscriptions.assignFromSubscribed(...);
{code}
 

 
h3. 2) After the rebalance is completed, add new logic to 
ConsumerCoordinator#onJoinComplete(...)
{code:java}
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        log.debug("Executing onJoinComplete with generation {} and memberId 
{}", generation, memberId);

        ......

        subscriptions.assignFromSubscribed(assignedPartitions);
        
        //Add new code here
        if (customerPausedPartitions != null && customerPausedPartitions.size() 
!= 0){
            customerPausedPartitions.forEach(topicPartition -> {
                if(subscriptions.isAssigned(topicPartition))
                    subscriptions.pause(topicPartition);
            });
            customerPausedPartitions = null;
        }

        // Add partitions that were not previously owned but are now assigned
        firstException.compareAndSet(null, 
invokePartitionsAssigned(addedPartitions));

        ......
    }
{code}
 

 

*The above is just a first draft of the modified code. It can only guarantee 
that after a rebalance, the topicPartitions still held in the new assignment of 
KafkaConsumer will maintain the paused mark.*

*{color:#FF0000}Note{color}: If the new assignment of kafkaConsumer no longer 
contains topicPartitions that have been paused before rebalance, the paused 
mark of these topicPartitions will be lost forever on the kafkaConsumer side, 
even if in a future rebalance, the kafkaConsumer will hold these partitions 
again.*

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to