This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0dc74c5556e KAFKA-17232: Do not generate task configs in 
MirrorCheckpointConnector if initial consumer group load times out (#16767)
0dc74c5556e is described below

commit 0dc74c5556ef5b35caadcc9a0e2ee968b6958fce
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Aug 8 21:58:11 2024 +0800

    KAFKA-17232: Do not generate task configs in MirrorCheckpointConnector if 
initial consumer group load times out (#16767)
    
    Reviewers: Hongten <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../connect/mirror/MirrorCheckpointConnector.java   | 21 ++++++++++++++++-----
 .../mirror/MirrorCheckpointConnectorTest.java       | 13 +++++++++++++
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 24c98b76d84..a387b695d8a 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
 
@@ -63,7 +64,7 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
     private Admin sourceAdminClient;
     private Admin targetAdminClient;
     private SourceAndTarget sourceAndTarget;
-    private Set<String> knownConsumerGroups = Collections.emptySet();
+    private Set<String> knownConsumerGroups = null;
 
     public MirrorCheckpointConnector() {
         // nop
@@ -81,7 +82,6 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
         if (!config.enabled()) {
             return;
         }
-        String connectorName = config.connectorName();
         sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), 
config.targetClusterAlias());
         topicFilter = config.topicFilter();
         groupFilter = config.groupFilter();
@@ -92,8 +92,6 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
         scheduler.execute(this::loadInitialConsumerGroups, "loading initial 
consumer groups");
         scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, 
config.refreshGroupsInterval(),
                 "refreshing consumer groups");
-        log.info("Started {} with {} consumer groups.", connectorName, 
knownConsumerGroups.size());
-        log.debug("Started {} with consumer groups: {}", connectorName, 
knownConsumerGroups);
     }
 
     @Override
@@ -133,6 +131,13 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
     // divide consumer groups among tasks
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
+        if (knownConsumerGroups == null) {
+            // If knownConsumerGroup is null, it means the initial loading has 
not finished.
+            // An exception should be thrown to trigger the retry behavior in 
the framework.
+            log.debug("Initial consumer loading has not yet completed");
+            throw new RetriableException("Timeout while loading consumer 
groups.");
+        }
+
         // if the replication is disabled, known consumer group is empty, or 
checkpoint emission is
         // disabled by setting 'emit.checkpoints.enabled' to false, the 
interval of checkpoint emission
         // will be negative and no 'MirrorCheckpointTask' will be created
@@ -186,6 +191,9 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
 
     private void refreshConsumerGroups()
             throws InterruptedException, ExecutionException {
+        // If loadInitialConsumerGroups fails for any reason(e.g., timeout), 
knownConsumerGroups may be null.
+        // We still want this method to recover gracefully in such cases.
+        Set<String> knownConsumerGroups = this.knownConsumerGroups == null ? 
Collections.emptySet() : this.knownConsumerGroups;
         Set<String> consumerGroups = findConsumerGroups();
         Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
         newConsumerGroups.removeAll(knownConsumerGroups);
@@ -196,14 +204,17 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
                     consumerGroups.size(), sourceAndTarget, 
newConsumerGroups.size(), deadConsumerGroups.size(),
                     knownConsumerGroups.size());
             log.debug("Found new consumer groups: {}", newConsumerGroups);
-            knownConsumerGroups = consumerGroups;
+            this.knownConsumerGroups = consumerGroups;
             context.requestTaskReconfiguration();
         }
     }
 
     private void loadInitialConsumerGroups()
             throws InterruptedException, ExecutionException {
+        String connectorName = config.connectorName();
         knownConsumerGroups = findConsumerGroups();
+        log.info("Started {} with {} consumer groups.", connectorName, 
knownConsumerGroups.size());
+        log.debug("Started {} with consumer groups: {}", connectorName, 
knownConsumerGroups);
     }
 
     Set<String> findConsumerGroups()
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 770ef683659..8a25bdaa5e4 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
 
 import org.junit.jupiter.api.Test;
 
@@ -94,6 +95,18 @@ public class MirrorCheckpointConnectorTest {
         assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
     }
 
+    @Test
+    public void testConsumerGroupInitializeTimeout() {
+        MirrorCheckpointConfig config = new 
MirrorCheckpointConfig(makeProps());
+        MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector(null, config);
+
+        assertThrows(
+                RetriableException.class,
+                () -> connector.taskConfigs(1),
+                "taskConfigs should throw exception when initial loading 
ConsumerGroup timeout"
+        );
+    }
+
     @Test
     public void testReplicationDisabled() {
         // disable the replication

Reply via email to