[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714861#comment-17714861
 ] 

Zhaoli commented on KAFKA-14807:
--------------------------------

[~durban] Thanks, but we use dedicated mode. 

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14807
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.4.0, 3.3.1, 3.3.2
>         Environment: centos7
>            Reporter: Zhaoli
>            Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
>     Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
>     if (offsetSync.isPresent()) {
>         if (offsetSync.get().upstreamOffset() > upstreamOffset) {
>             // Offset is too far in the past to translate accurately
>             return OptionalLong.of(-1L);
>         }
>         long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
>         return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
>     } else {
>         return OptionalLong.empty();
>     }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to