[
https://issues.apache.org/jira/browse/KAFKA-14023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624052#comment-17624052
]
Greg Harris commented on KAFKA-14023:
-------------------------------------
[~Justinwins] I believe the motivation for this if-condition is explained in
the [original
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0],
reproduced here:
{noformat}
Only selected consumer offsets are written and the initial criteria are
* (1) only write offsets for the consumers who are inactive in target cluster.
This will avoid the situation when the two consumer instances (with same
consumer group ID) are running both at primary and backup clusters, the offsets
at target cluster will be overwritten by the sync task.
* (2) if the "watermark" of the consumer offsets at target cluster is higher
than the offsets at primary cluster, do not write the lower 'watermark" to
target cluster. This will avoid the situation when the consumption progress at
primary cluster is slower than the progress at backup cluster, writing lower
'watermark" will rewind the consumer to previous offsets, leading to consuming
duplicate messages.{noformat}
It does seem that not propagating offset-resets from primary to target cluster
is a side-effect of this special case to handle avoiding re-delivery in cases
where the primary's consumers fall behind the target's consumers.
As a workaround, you can manually reset the offsets in the target cluster, and
I believe that MM2 will propagate the translated offsets as you intend.
> MirrorCheckpointTask.syncGroupOffset does not have to check if translated
> offset from upstream is smaller than the current consumer offset
> -------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-14023
> URL: https://issues.apache.org/jira/browse/KAFKA-14023
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect, mirrormaker
> Affects Versions: 3.2.0
> Reporter: Justinwins
> Assignee: Justinwins
> Priority: Minor
>
> In MirrorCheckpointTask.syncGroupOffset () , there is a dedicated check ,
> as described :
> (line 285)
>
> {code:java}
> // code placeholder
> // if translated offset from upstream is smaller than the current consumer
> offset
> // in the target, skip updating the offset for that partition
> long latestDownstreamOffset =
> targetConsumerOffset.get(topicPartition).offset();
> if (latestDownstreamOffset >= convertedOffset.offset()) {
> log.trace("latestDownstreamOffset {} is larger than or equal to
> convertedUpstreamOffset {} for "
> + "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(),
> topicPartition);
> continue;
> }
> offsetToSync.put(topicPartition, convertedOffset); {code}
>
> I think there is no need to check 'whether translated offset from upstream is
> smaller than the current consumer offset' ,as downstream offsets are better
> to keep up with upstream
> offsets.Let's say, we reset offset for upstream , it is expected that
> downstream offsets are synced accordingly ,too
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)