This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0dc74c5556e KAFKA-17232: Do not generate task configs in
MirrorCheckpointConnector if initial consumer group load times out (#16767)
0dc74c5556e is described below
commit 0dc74c5556ef5b35caadcc9a0e2ee968b6958fce
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Aug 8 21:58:11 2024 +0800
KAFKA-17232: Do not generate task configs in MirrorCheckpointConnector if
initial consumer group load times out (#16767)
Reviewers: Hongten <[email protected]>, Chris Egerton
<[email protected]>
---
.../connect/mirror/MirrorCheckpointConnector.java | 21 ++++++++++++++++-----
.../mirror/MirrorCheckpointConnectorTest.java | 13 +++++++++++++
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 24c98b76d84..a387b695d8a 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
@@ -63,7 +64,7 @@ public class MirrorCheckpointConnector extends
SourceConnector {
private Admin sourceAdminClient;
private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget;
- private Set<String> knownConsumerGroups = Collections.emptySet();
+ private Set<String> knownConsumerGroups = null;
public MirrorCheckpointConnector() {
// nop
@@ -81,7 +82,6 @@ public class MirrorCheckpointConnector extends
SourceConnector {
if (!config.enabled()) {
return;
}
- String connectorName = config.connectorName();
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(),
config.targetClusterAlias());
topicFilter = config.topicFilter();
groupFilter = config.groupFilter();
@@ -92,8 +92,6 @@ public class MirrorCheckpointConnector extends
SourceConnector {
scheduler.execute(this::loadInitialConsumerGroups, "loading initial
consumer groups");
scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups,
config.refreshGroupsInterval(),
"refreshing consumer groups");
- log.info("Started {} with {} consumer groups.", connectorName,
knownConsumerGroups.size());
- log.debug("Started {} with consumer groups: {}", connectorName,
knownConsumerGroups);
}
@Override
@@ -133,6 +131,13 @@ public class MirrorCheckpointConnector extends
SourceConnector {
// divide consumer groups among tasks
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
+ if (knownConsumerGroups == null) {
+ // If knownConsumerGroup is null, it means the initial loading has
not finished.
+ // An exception should be thrown to trigger the retry behavior in
the framework.
+ log.debug("Initial consumer loading has not yet completed");
+ throw new RetriableException("Timeout while loading consumer
groups.");
+ }
+
// if the replication is disabled, known consumer group is empty, or
checkpoint emission is
// disabled by setting 'emit.checkpoints.enabled' to false, the
interval of checkpoint emission
// will be negative and no 'MirrorCheckpointTask' will be created
@@ -186,6 +191,9 @@ public class MirrorCheckpointConnector extends
SourceConnector {
private void refreshConsumerGroups()
throws InterruptedException, ExecutionException {
+ // If loadInitialConsumerGroups fails for any reason(e.g., timeout),
knownConsumerGroups may be null.
+ // We still want this method to recover gracefully in such cases.
+ Set<String> knownConsumerGroups = this.knownConsumerGroups == null ?
Collections.emptySet() : this.knownConsumerGroups;
Set<String> consumerGroups = findConsumerGroups();
Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups);
@@ -196,14 +204,17 @@ public class MirrorCheckpointConnector extends
SourceConnector {
consumerGroups.size(), sourceAndTarget,
newConsumerGroups.size(), deadConsumerGroups.size(),
knownConsumerGroups.size());
log.debug("Found new consumer groups: {}", newConsumerGroups);
- knownConsumerGroups = consumerGroups;
+ this.knownConsumerGroups = consumerGroups;
context.requestTaskReconfiguration();
}
}
private void loadInitialConsumerGroups()
throws InterruptedException, ExecutionException {
+ String connectorName = config.connectorName();
knownConsumerGroups = findConsumerGroups();
+ log.info("Started {} with {} consumer groups.", connectorName,
knownConsumerGroups.size());
+ log.debug("Started {} with consumer groups: {}", connectorName,
knownConsumerGroups);
}
Set<String> findConsumerGroups()
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 770ef683659..8a25bdaa5e4 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
import org.junit.jupiter.api.Test;
@@ -94,6 +95,18 @@ public class MirrorCheckpointConnectorTest {
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
}
+ @Test
+ public void testConsumerGroupInitializeTimeout() {
+ MirrorCheckpointConfig config = new
MirrorCheckpointConfig(makeProps());
+ MirrorCheckpointConnector connector = new
MirrorCheckpointConnector(null, config);
+
+ assertThrows(
+ RetriableException.class,
+ () -> connector.taskConfigs(1),
+ "taskConfigs should throw exception when initial loading
ConsumerGroup timeout"
+ );
+ }
+
@Test
public void testReplicationDisabled() {
// disable the replication