[
https://issues.apache.org/jira/browse/CAMEL-17798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506847#comment-17506847
]
Rafał Gała edited comment on CAMEL-17798 at 3/15/22, 11:30 AM:
---------------------------------------------------------------
I think I got it. It looks like the problem is in PartitionAssignmentListener's
*onPartitionsRevoked* method that gets called when the other JVM is shut down:
{code:java}
String offsetKey = serializeOffsetKey(partition);
Long offset = lastProcessedOffset.get(offsetKey);
if (offset == null) {
offset = -1L;
}
try {
// only commit offsets if the component has control
if (configuration.getAutoCommitEnable()) {
if (stopping) {
commitManager.commitOffsetOnStop(partition, offset);
} else {
commitManager.commitOffset(partition, offset);
}
}
{code}
When there were no message consumed from the topic (no new messages arrived
since the consumer subscribed) the *offset* variable here is null because
*lastProcessedOffset* map does not contain infomation about last processed
offset for the partition . It is then set to -1 and passed to
*commitManager.commitOffset(partition, offset)* method:
{code:java}
private void commitSync(TopicPartition partition, long partitionLastOffset) {
long timeout = configuration.getCommitTimeoutMs();
consumer.commitSync(
Collections.singletonMap(partition, new
OffsetAndMetadata(partitionLastOffset + 1)),
Duration.ofMillis(timeout));
} {code}
which commits offset 0 for the partition causing all events to be received
again.
I think that last commited offset for a consumer group and partition should be
retrieved from Kafka somehow and assigned to *lastProcessedOffset* map upon
subscription, or the commit should not happen at all when auto commit is
enabled. Just let Kafka handle this and consumers should receive events that
arrived since last auto commit.
was (Author: rgala):
I think I got it. It looks like the problem is in PartitionAssignmentListener's
*onPartitionsRevoked* method that gets called when the other JVM is shut down:
{code:java}
String offsetKey = serializeOffsetKey(partition);
Long offset = lastProcessedOffset.get(offsetKey);
if (offset == null) {
offset = -1L;
}
try {
// only commit offsets if the component has control
if (configuration.getAutoCommitEnable()) {
if (stopping) {
commitManager.commitOffsetOnStop(partition, offset);
} else {
commitManager.commitOffset(partition, offset);
}
}
{code}
When there were no message consumed from the topic (no new messages arrived
since the consumer subscribed) the *offset* variable here is null because
*lastProcessedOffset* map does not contain infomation about last processed
offset for the partition . It is then set to -1 and passed to
*commitManager.commitOffset(partition, offset)* method:
{code:java}
private void commitSync(TopicPartition partition, long partitionLastOffset) {
long timeout = configuration.getCommitTimeoutMs();
consumer.commitSync(
Collections.singletonMap(partition, new
OffsetAndMetadata(partitionLastOffset + 1)),
Duration.ofMillis(timeout));
} {code}
which commits offset 0 for the partition causing all events to be received
again.
I think an offset for a consumer group and partition should be retrieved from
Kafka somehow and assigned to *lastProcessedOffset* map upon subscription, or
the commit should not happen at all when auto commit is enabled. Just let Kafka
handle this and consumers should receive events that arrived since last auto
commit.
> camel-kafka - Offsets resetting when another Camel node is shutdown
> -------------------------------------------------------------------
>
> Key: CAMEL-17798
> URL: https://issues.apache.org/jira/browse/CAMEL-17798
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.15.0
> Reporter: Rafał Gała
> Priority: Major
> Fix For: 3.16.0, 3.17.0
>
>
> After upgrading to 3.15.0 we began to experience lots of situations where
> offsets on topics get reset when one of the JVMs running Camel gets shut down
> (Camel is shut down gracefully, we have at least 2 JVMs running in parallel
> that consume events from topics for the same consumer group). This is a
> problem when a topic contains millions of events because we need to retrieve
> all of them again. We have auto commit enabled and do not use any manual
> commit management.
> This what gets logged on a running JVM when we shut down the other one:
> {noformat}
> 2022-03-15 09:59:40.285 [Camel (camel-1) thread #17 -
> KafkaConsumer[*masked*]] INFO
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange:1413
> - [Consumer clientId=consumer-*masked*-40, groupId=*masked*] Fetch position
> FetchPosition{offset=0, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[*masked*:9093 (id: 2 rack:
> O66)], epoch=643}} is out of range for partition *masked*-3, resetting offset
> 2022-03-15 09:59:40.519 [kafka-coordinator-heartbeat-thread | *masked*] INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated:398
> - [Consumer clientId=consumer-*masked*-40, groupId=*masked*] Resetting
> offset for partition *masked*-3 to position FetchPosition{offset=2193522,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[*masked*:9093 (id: 2 rack:
> O66)], epoch=643}}.{noformat}
> This does not occur on 3.14.2. I believe this may be related to commit
> manager introduced in 3.15.0, but I have not managed to confirm it yet.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)