hudeqi commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1149959006


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -178,6 +178,7 @@ private List<SourceRecord> sourceRecordsForGroup(String 
group) throws Interrupte
         }
     }
 
+    // There may be a group that consumes the topic in "topics.exclude", so it 
also needs to be filtered

Review Comment:
   > 
   
   1. In MirrorCheckpointConnector, the group list is obtained by filtering 
through "shouldReplicateByTopicFilter". Therefore, compared with the trunk, 
every MirrorCheckpointTask is allocated to fewer groups (that is, it solves 
KAFKA-14842)
   2. Because the principle of filtering groups through 
"shouldReplicateByTopicFilter" in MirrorCheckpointConnector is: as long as 
there is at least one topic accepted by topicFilter, the group will be assigned 
to MirrorCheckpointTask for replication, but the group may also consume other 
topics that are not accepted by topicFilter at the same time, so The offsets 
corresponding to these unaccepted topics should not be replicated.(because the 
syncGroupOffset thread takes the data from "checkpointsPerConsumerGroup", so it 
needs to be filtered by topic filter again.)
   @C0urante 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to