Claudio Benfatto created KAFKA-16291:
----------------------------------------
Summary: Mirrormaker2 wrong checkpoints
Key: KAFKA-16291
URL: https://issues.apache.org/jira/browse/KAFKA-16291
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 3.6.1
Environment: Mirrormaker2 version 3.6.1 running on docker containers
Reporter: Claudio Benfatto
I am running Mirrormaker2 with the following configuration:
{noformat}
clusters = fallingwaterfall, weatheredbase
sync.group.offsets.interval.seconds=30
emit.checkpoints.interval.seconds=30
offset.lag.max=0
fallingwaterfall->weatheredbase.enabled = true
weatheredbase->fallingwaterfall.enabled = false
sync.group.offsets.enabled=true
emit.heartbeats.enabled=true
emit.checkpoints.enabled=true
emit.checkpoints.interval.seconds=30
refresh.groups.enabled=true
refresh.groups.interval.seconds=30
refresh.topics.enabled=true
sync.topic.configs.enabled=true
refresh.topics.interval.seconds=30
sync.topic.acls.enabled = false
fallingwaterfall->weatheredbase.topics = storage-demo-.*
fallingwaterfall->weatheredbase.groups = storage-demo-.*
group.id=mirror-maker-fallingwaterfall-weatheredbase
consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
fallingwaterfall.consumer.isolation.level = read_committed
weatheredbase.producer.enable.idempotence = true
weatheredbase.producer.acks=all
weatheredbase.exactly.once.source.support = enabled
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
{noformat}
I am experiencing issues with the consumer group offset synchronisation.
I have a setup with a 12-partition topic, named *storage-demo-test,* a single
transactional producer to this topic and a consumer group, named
*storage-demo-test-cg,* consuming from it.
The consumer configuration is:
{code:java}
'auto.offset.reset': 'earliest',
'isolation.level': 'read_committed',
'enable.auto.commit': False, {code}
and I'm committing the offsets explicitly and synchronously after each poll.
What I observed is that the synchronised offsets between the upstream and
downstream cluster for the *storage-demo-test-cg* are often wrong.
For example in the case of this checkpoint:
{code:java}
(1, 1708505669764) - 6252 -
CheckpointKey(consumer_group='storage-demo-test-cg', topic='storage-demo-test',
partition=5) - CheckpointValue(upstream_offset=197532,
downstream_offset=196300) {code}
We have a mismatch in the replicated messages:
{code:java}
[fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
Test message 1027-0 {code}
{code:java}
[weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
Test message 1015-9 {code}
In the Mirrormaker2 logs I see many of these messages:
{code:java}
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0]
translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532):
Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5,
upstreamOffset=196913, downstreamOffset=195683})
(org.apache.kafka.connect.mirror.OffsetSyncStore:160)
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping
Checkpoint{consumerGroupId=storage-demo-test-cg,
topicPartition=storage-demo-test-5, upstreamOffset=197532,
downstreamOffset=195684, metadata=} (preventing downstream rewind)
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping
Checkpoint{consumerGroupId=storage-demo-test-cg,
topicPartition=storage-demo-test-5, upstreamOffset=197532,
downstreamOffset=195684, metadata=} (preventing downstream rewind)
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0]
translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532):
Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5,
upstreamOffset=196913, downstreamOffset=195683})
(org.apache.kafka.connect.mirror.OffsetSyncStore:160)
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765,
downstreamOffset=197535} applied, new state is
[198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
(org.apache.kafka.connect.mirror.OffsetSyncStore:193)
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server -
[2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532,
downstreamOffset=196302} applied, new state is
[197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
(org.apache.kafka.connect.mirror.OffsetSyncStore:193)
mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server -
[2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets
for storage-demo-test-5: 197532==196302
(org.apache.kafka.connect.mirror.MirrorSourceTask:251){code}
And looking in the OffsetSync topic, I see the correct value for the offset
sync:
{code:java}
(1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test',
partition=5) - OffsetSyncValue(upstream_offset=197532, downstream_offset=196302)
{code}
{code:java}
[weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1
Test message 1027-0 {code}
So it seems that the offset conversions and checkpoints produced in the
*MirrorCheckpointTask* are not matching the information committed to the
OffsetSync topic by the *MirrorSourceTask.*
Please let me know if you need additional info about the setup I'm running or
collecting more logs.
Thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)