Ravindranath Kakarla created KAFKA-19794:
--------------------------------------------

             Summary: MirrorCheckpointTask causes consumers on target cluster 
to rewind offsets or skip messages due to unsafe offset commits
                 Key: KAFKA-19794
                 URL: https://issues.apache.org/jira/browse/KAFKA-19794
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 3.5.2
            Reporter: Ravindranath Kakarla


The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer 
groups on the target cluster, causing consumers to rewind their offsets or skip 
messages. Typically brokers prevent committing offsets for consumer groups that 
are not in `EMPTY` state by throwing `UnknownMemberIdException`. In addition, 
`MirrorCheckpointTask` has logic in place to prevent committing offsets older 
than target for `EMPTY` consumer groups. However, due to a bug in 
`MirrorCheckpointTask` code, this prevention check is not enforced and it 
attempts to commit offsets for `STABLE` consumers. These calls can go through 
if consumers were momentarily disconnected moving the group state to `EMPTY`. 
This results in consumers' offsets getting reset to older values. If the offset 
is not available on the target broker (due to retention), the consumers can get 
reset to `earliest` or `latest`, thus reading duplicates or skipping messages. 

### Bug location

1. In 
[MirrorCheckpointTask|[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L305],]
 we only update the latest target cluster offsets (`idleConsumerGroupsOffset`)  
if target consumer group state is `EMPTY`.

2. When `syncGroupOffset` is called, we check if the target consumer group is 
present in  

`idleConsumerGroupsOffset`. The consumer group won't be present as it's an 
active group. We assume that this is a new group and start syncing consumer 
group offsets to target. These calls fail with `{{{}Unable to sync offsets for 
consumer group XYZ. This is likely caused by consumers currently using this 
group in the target cluster. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:{}}}`. When consumers 
have failed over, the logs typically contain a lot of these messages. These 
calls can succeed if consumer is momentarily disconnected due to restarts. The 
code should not assume the lack of consumer group in `idleConsumerGroupsOffset` 
map as a new consumer group.

3. These erroneous behavior can also be triggered calls to 
`describeConsumerGroups` or `listConsumerGroupOffsets` fail in 
`refreshIdleConsumerGroupOffset` method due to transient timeouts.

### Fix

Potential fix would be to add an explicit check to only sync offsets for 
`EMPTY` consumer group. We should also skip offset syncing for consumer groups 
for which we couldn't refresh the offsets. 

 

```

java
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget = 
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD) {
    // Safe to sync - new or dead group
    syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
} else if (groupStateOnTarget == ConsumerGroupState.EMPTY) {
    // Safe to sync - idle group
    // ... existing offset comparison logic
} else {
    // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
    log.info("Consumer group: {} with state: {} is being actively consumed on 
the target, skipping sync.",
             consumerGroupId, groupStateOnTarget);
}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to