Ravindranath Kakarla created KAFKA-19794:
--------------------------------------------
Summary: MirrorCheckpointTask causes consumers on target cluster
to rewind offsets or skip messages due to unsafe offset commits
Key: KAFKA-19794
URL: https://issues.apache.org/jira/browse/KAFKA-19794
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 3.5.2
Reporter: Ravindranath Kakarla
The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer
groups on the target cluster, causing consumers to rewind their offsets or skip
messages. Typically brokers prevent committing offsets for consumer groups that
are not in `EMPTY` state by throwing `UnknownMemberIdException`. In addition,
`MirrorCheckpointTask` has logic in place to prevent committing offsets older
than target for `EMPTY` consumer groups. However, due to a bug in
`MirrorCheckpointTask` code, this prevention check is not enforced and it
attempts to commit offsets for `STABLE` consumers. These calls can go through
if consumers were momentarily disconnected moving the group state to `EMPTY`.
This results in consumers' offsets getting reset to older values. If the offset
is not available on the target broker (due to retention), the consumers can get
reset to `earliest` or `latest`, thus reading duplicates or skipping messages.
### Bug location
1. In
[MirrorCheckpointTask|[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L305],]
we only update the latest target cluster offsets (`idleConsumerGroupsOffset`)
if target consumer group state is `EMPTY`.
2. When `syncGroupOffset` is called, we check if the target consumer group is
present in
`idleConsumerGroupsOffset`. The consumer group won't be present as it's an
active group. We assume that this is a new group and start syncing consumer
group offsets to target. These calls fail with `{{{}Unable to sync offsets for
consumer group XYZ. This is likely caused by consumers currently using this
group in the target cluster.
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:{}}}`. When consumers
have failed over, the logs typically contain a lot of these messages. These
calls can succeed if consumer is momentarily disconnected due to restarts. The
code should not assume the lack of consumer group in `idleConsumerGroupsOffset`
map as a new consumer group.
3. These erroneous behavior can also be triggered calls to
`describeConsumerGroups` or `listConsumerGroupOffsets` fail in
`refreshIdleConsumerGroupOffset` method due to transient timeouts.
### Fix
Potential fix would be to add an explicit check to only sync offsets for
`EMPTY` consumer group. We should also skip offset syncing for consumer groups
for which we couldn't refresh the offsets.
```
java
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget =
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD) {
// Safe to sync - new or dead group
syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
} else if (groupStateOnTarget == ConsumerGroupState.EMPTY) {
// Safe to sync - idle group
// ... existing offset comparison logic
} else {
// Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
log.info("Consumer group: {} with state: {} is being actively consumed on
the target, skipping sync.",
consumerGroupId, groupStateOnTarget);
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)