This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de2ccb5789c KAFKA-18021: Disabled MirrorCheckpointConnector throws
RetriableException on task config generation (#18098)
de2ccb5789c is described below
commit de2ccb5789c6a9465bed5bc713612377e00d0b79
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Dec 12 05:56:38 2024 +0800
KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException
on task config generation (#18098)
Reviewers: Greg Harris <[email protected]>
---
.../kafka/connect/mirror/MirrorCheckpointConnector.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 0af3b14e3b8..218c64e85a4 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -132,6 +132,12 @@ public class MirrorCheckpointConnector extends
SourceConnector {
// divide consumer groups among tasks
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
+ // If the replication is disabled or checkpoint emission is disabled
by setting 'emit.checkpoints.enabled' to false,
+ // the interval of checkpoint emission will be negative and no
'MirrorCheckpointTask' will be created.
+ if (!config.enabled() ||
config.emitCheckpointsInterval().isNegative()) {
+ return Collections.emptyList();
+ }
+
if (knownConsumerGroups == null) {
// If knownConsumerGroup is null, it means the initial loading has
not finished.
// An exception should be thrown to trigger the retry behavior in
the framework.
@@ -139,13 +145,11 @@ public class MirrorCheckpointConnector extends
SourceConnector {
throw new RetriableException("Timeout while loading consumer
groups.");
}
- // if the replication is disabled, known consumer group is empty, or
checkpoint emission is
- // disabled by setting 'emit.checkpoints.enabled' to false, the
interval of checkpoint emission
- // will be negative and no 'MirrorCheckpointTask' will be created
- if (!config.enabled() || knownConsumerGroups.isEmpty()
- || config.emitCheckpointsInterval().isNegative()) {
+ // If the consumer group is empty, no 'MirrorCheckpointTask' will be
created.
+ if (knownConsumerGroups.isEmpty()) {
return Collections.emptyList();
}
+
int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
List<List<String>> groupsPartitioned =
ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks);
return IntStream.range(0, numTasks)