[ 
https://issues.apache.org/jira/browse/CAMEL-17798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506847#comment-17506847
 ] 

Rafał Gała commented on CAMEL-17798:
------------------------------------

I think I got it. It looks like the problem is in PartitionAssignmentListener's 
*onPartitionsRevoked* method that gets called when the other JVM is shut down:
{code:java}
            String offsetKey = serializeOffsetKey(partition);
            Long offset = lastProcessedOffset.get(offsetKey);
            if (offset == null) {
                offset = -1L;
            }
            try {
                // only commit offsets if the component has control
                if (configuration.getAutoCommitEnable()) {
                    if (stopping) {
                        commitManager.commitOffsetOnStop(partition, offset);
                    } else {
                        commitManager.commitOffset(partition, offset);
                    }

                }
{code}
When there were no message consumed from the topic (no new messages arrived 
since the consumer subscribed) the *offset* variable here is null because 
*lastProcessedOffset* map does not contain infomation about last processed 
offset for the partition . It is then set to -1 and passed to 
*commitManager.commitOffset(partition, offset)* method:

 
{code:java}
private void commitSync(TopicPartition partition, long partitionLastOffset) {
    long timeout = configuration.getCommitTimeoutMs();
    consumer.commitSync(
            Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)),
            Duration.ofMillis(timeout));
} {code}
which commits offset 0 for the partition causing all events to be received 
again.

 

I think an offset for a consumer group and partition should be retrieved from 
Kafka somehow and assigned to *lastProcessedOffset* map upon subscription, or 
the commit should not happen at all when auto commit is enabled. Just let Kafka 
handle this and consumers should receive events that arrived since last auto 
commit.

> camel-kafka - Offsets resetting when another Camel node is shutdown
> -------------------------------------------------------------------
>
>                 Key: CAMEL-17798
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17798
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.15.0
>            Reporter: Rafał Gała
>            Priority: Major
>             Fix For: 3.16.0, 3.17.0
>
>
> After upgrading to 3.15.0 we began to experience lots of situations where 
> offsets on topics get reset when one of the JVMs running Camel gets shut down 
> (Camel is shut down gracefully, we have at least 2 JVMs running in parallel 
> that consume events from topics for the same consumer group). This is a 
> problem when a topic contains millions of events because we need to retrieve 
> all of them again. We have auto commit enabled and do not use any manual 
> commit management.
> This what gets logged on a running JVM when we shut down the other one:
> {noformat}
> 2022-03-15 09:59:40.285 [Camel (camel-1) thread #17 - 
> KafkaConsumer[*masked*]] INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange:1413
>  - [Consumer clientId=consumer-*masked*-40, groupId=*masked*] Fetch position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[*masked*:9093 (id: 2 rack: 
> O66)], epoch=643}} is out of range for partition *masked*-3, resetting offset 
> 2022-03-15 09:59:40.519 [kafka-coordinator-heartbeat-thread | *masked*] INFO  
> org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated:398
>  - [Consumer clientId=consumer-*masked*-40, groupId=*masked*] Resetting 
> offset for partition *masked*-3 to position FetchPosition{offset=2193522, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[*masked*:9093 (id: 2 rack: 
> O66)], epoch=643}}.{noformat}
> This does not occur on 3.14.2. I believe this may be related to commit 
> manager introduced in 3.15.0, but I have not managed to confirm it yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to