This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de2ccb5789c KAFKA-18021: Disabled MirrorCheckpointConnector throws 
RetriableException on task config generation (#18098)
de2ccb5789c is described below

commit de2ccb5789c6a9465bed5bc713612377e00d0b79
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Dec 12 05:56:38 2024 +0800

    KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException 
on task config generation (#18098)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../kafka/connect/mirror/MirrorCheckpointConnector.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 0af3b14e3b8..218c64e85a4 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -132,6 +132,12 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
     // divide consumer groups among tasks
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // If the replication is disabled or checkpoint emission is disabled 
by setting 'emit.checkpoints.enabled' to false,
+        // the interval of checkpoint emission will be negative and no 
'MirrorCheckpointTask' will be created.
+        if (!config.enabled() || 
config.emitCheckpointsInterval().isNegative()) {
+            return Collections.emptyList();
+        }
+
         if (knownConsumerGroups == null) {
             // If knownConsumerGroup is null, it means the initial loading has 
not finished.
             // An exception should be thrown to trigger the retry behavior in 
the framework.
@@ -139,13 +145,11 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
             throw new RetriableException("Timeout while loading consumer 
groups.");
         }
 
-        // if the replication is disabled, known consumer group is empty, or 
checkpoint emission is
-        // disabled by setting 'emit.checkpoints.enabled' to false, the 
interval of checkpoint emission
-        // will be negative and no 'MirrorCheckpointTask' will be created
-        if (!config.enabled() || knownConsumerGroups.isEmpty()
-                || config.emitCheckpointsInterval().isNegative()) {
+        // If the consumer group is empty, no 'MirrorCheckpointTask' will be 
created.
+        if (knownConsumerGroups.isEmpty()) {
             return Collections.emptyList();
         }
+
         int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
         List<List<String>> groupsPartitioned = 
ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks);
         return IntStream.range(0, numTasks)

Reply via email to