hudeqi commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1149959006
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -178,6 +178,7 @@ private List<SourceRecord> sourceRecordsForGroup(String
group) throws Interrupte
}
}
+ // There may be a group that consumes the topic in "topics.exclude", so it
also needs to be filtered
Review Comment:
>
1. In MirrorCheckpointConnector, the group list is obtained by filtering
through "shouldReplicateByTopicFilter". Therefore, compared with the trunk,
every MirrorCheckpointTask is allocated to fewer groups (that is, it solves
KAFKA-14842)
2. Because the principle of filtering groups through
"shouldReplicateByTopicFilter" in MirrorCheckpointConnector is: as long as
there is at least one topic accepted by topicFilter, the group will be assigned
to MirrorCheckpointTask for replication, but the group may also consume other
topics that are not accepted by topicFilter at the same time, so The offsets
corresponding to these unaccepted topics should not be replicated.(because the
syncGroupOffset thread takes the data from "checkpointsPerConsumerGroup", so it
needs to be filtered by topic filter again.)
@C0urante
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]