[ 
https://issues.apache.org/jira/browse/KAFKA-14023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624052#comment-17624052
 ] 

Greg Harris commented on KAFKA-14023:
-------------------------------------

[~Justinwins] I believe the motivation for this if-condition is explained in 
the [original 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0],
 reproduced here:

 
{noformat}
Only selected consumer offsets are written and the initial criteria are 
* (1) only write offsets for the consumers who are inactive in target cluster. 
This will avoid the situation when the two consumer instances (with same 
consumer group ID) are running both at primary and backup clusters, the offsets 
at target cluster will be overwritten by the sync task.
* (2) if the "watermark" of the consumer offsets at target cluster is higher 
than the offsets at primary cluster, do not write the lower 'watermark" to 
target cluster. This will avoid the situation when the consumption progress at 
primary cluster is slower than the progress at backup cluster, writing lower 
'watermark" will rewind the consumer to previous offsets, leading to consuming 
duplicate messages.{noformat}
It does seem that not propagating offset-resets from primary to target cluster 
is a side-effect of this special case to handle avoiding re-delivery in cases 
where the primary's consumers fall behind the target's consumers.

As a workaround, you can manually reset the offsets in the target cluster, and 
I believe that MM2 will propagate the translated offsets as you intend.

 

> MirrorCheckpointTask.syncGroupOffset  does not have to check if translated 
> offset from upstream is smaller than the current consumer offset
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14023
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14023
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect, mirrormaker
>    Affects Versions: 3.2.0
>            Reporter: Justinwins
>            Assignee: Justinwins
>            Priority: Minor
>
> In  MirrorCheckpointTask.syncGroupOffset () , there  is  a dedicated check , 
> as described :
> (line 285)
>  
> {code:java}
> // code placeholder
> // if translated offset from upstream is smaller than the current consumer 
> offset
> // in the target, skip updating the offset for that partition
> long latestDownstreamOffset = 
> targetConsumerOffset.get(topicPartition).offset();
> if (latestDownstreamOffset >= convertedOffset.offset()) {
> log.trace("latestDownstreamOffset {} is larger than or equal to 
> convertedUpstreamOffset {} for "
> + "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), 
> topicPartition);
> continue;
> }
> offsetToSync.put(topicPartition, convertedOffset); {code}
>  
> I think there is no need to check 'whether translated offset from upstream is 
> smaller than the current consumer offset' ,as downstream offsets are better 
> to keep up with upstream 
> offsets.Let's say, we reset offset for upstream , it is expected that 
> downstream offsets are synced accordingly ,too
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to